diff --git a/lib/selectors/user-selectors.js b/lib/selectors/user-selectors.js index eb6080307..b65cb2ae5 100644 --- a/lib/selectors/user-selectors.js +++ b/lib/selectors/user-selectors.js @@ -1,348 +1,347 @@ // @flow import _memoize from 'lodash/memoize.js'; import { createSelector } from 'reselect'; import bots from '../facts/bots.js'; import { getAvatarForUser, getRandomDefaultEmojiAvatar, } from '../shared/avatar-utils.js'; import { getSingleOtherUser } from '../shared/thread-utils.js'; +import { type P2PMessageRecipient } from '../tunnelbroker/peer-to-peer-context.js'; import { type AuxUserInfos, type AuxUserInfo, } from '../types/aux-user-types.js'; import type { ClientEmojiAvatar } from '../types/avatar-types'; import { type IdentityPlatformDetails, identityDeviceTypes, } from '../types/identity-service-types.js'; import type { RelativeMemberInfo, RawThreadInfo, } from '../types/minimally-encoded-thread-permissions-types.js'; import type { BaseAppState } from '../types/redux-types.js'; import { userRelationshipStatus } from '../types/relationship-types.js'; import { threadTypes } from '../types/thread-types-enum.js'; import type { RawThreadInfos } from '../types/thread-types.js'; import type { UserInfos, RelativeUserInfo, AccountUserInfo, CurrentUserInfo, } from '../types/user-types.js'; import { entries, values } from '../utils/objects.js'; // Used for specific message payloads that include an array of user IDs, ie. // array of initial users, array of added users function userIDsToRelativeUserInfos( userIDs: $ReadOnlyArray, viewerID: ?string, userInfos: UserInfos, ): RelativeUserInfo[] { const relativeUserInfos: RelativeUserInfo[] = []; for (const userID of userIDs) { const username = userInfos[userID] ? userInfos[userID].username : null; const relativeUserInfo = { id: userID, username, isViewer: userID === viewerID, }; if (userID === viewerID) { relativeUserInfos.unshift(relativeUserInfo); } else { relativeUserInfos.push(relativeUserInfo); } } return relativeUserInfos; } function getRelativeMemberInfos( threadInfo: ?RawThreadInfo, currentUserID: ?string, userInfos: UserInfos, ): $ReadOnlyArray { const relativeMemberInfos: RelativeMemberInfo[] = []; if (!threadInfo) { return relativeMemberInfos; } const memberInfos = threadInfo.members; for (const memberInfo of memberInfos) { if (!memberInfo.role) { continue; } const username = userInfos[memberInfo.id] ? userInfos[memberInfo.id].username : null; const { id, role, isSender, minimallyEncoded } = memberInfo; if (memberInfo.id === currentUserID) { relativeMemberInfos.unshift({ id, role, isSender, minimallyEncoded, username, isViewer: true, }); } else { relativeMemberInfos.push({ id, role, isSender, minimallyEncoded, username, isViewer: false, }); } } return relativeMemberInfos; } const emptyArray: $ReadOnlyArray = []; // Includes current user at the start const baseRelativeMemberInfoSelectorForMembersOfThread: ( threadID: ?string, ) => (state: BaseAppState<>) => $ReadOnlyArray = ( threadID: ?string, ) => { if (!threadID) { return () => emptyArray; } return createSelector( (state: BaseAppState<>) => state.threadStore.threadInfos[threadID], (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo.id, (state: BaseAppState<>) => state.userStore.userInfos, getRelativeMemberInfos, ); }; const relativeMemberInfoSelectorForMembersOfThread: ( threadID: ?string, ) => (state: BaseAppState<>) => $ReadOnlyArray = _memoize( baseRelativeMemberInfoSelectorForMembersOfThread, ); const userInfoSelectorForPotentialMembers: (state: BaseAppState<>) => { [id: string]: AccountUserInfo, } = createSelector( (state: BaseAppState<>) => state.userStore.userInfos, (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo.id, ( userInfos: UserInfos, currentUserID: ?string, ): { [id: string]: AccountUserInfo } => { const availableUsers: { [id: string]: AccountUserInfo } = {}; for (const id in userInfos) { const { username, relationshipStatus } = userInfos[id]; if (id === currentUserID || !username) { continue; } if ( relationshipStatus !== userRelationshipStatus.BLOCKED_VIEWER && relationshipStatus !== userRelationshipStatus.BOTH_BLOCKED ) { availableUsers[id] = { id, username, relationshipStatus }; } } return availableUsers; }, ); const isLoggedIn = (state: BaseAppState<>): boolean => !!( state.currentUserInfo && !state.currentUserInfo.anonymous && state.dataLoaded ); const isLoggedInToKeyserver: ( keyserverID: ?string, ) => (state: BaseAppState<>) => boolean = _memoize( (keyserverID: ?string) => (state: BaseAppState<>) => { if (!keyserverID) { return false; } const cookie = state.keyserverStore.keyserverInfos[keyserverID]?.cookie; return !!cookie && cookie.startsWith('user='); }, ); const usersWithPersonalThreadSelector: ( state: BaseAppState<>, ) => $ReadOnlySet = createSelector( (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo.id, (state: BaseAppState<>) => state.threadStore.threadInfos, (viewerID: ?string, threadInfos: RawThreadInfos) => { const personalThreadMembers = new Set(); for (const threadID in threadInfos) { const thread = threadInfos[threadID]; if ( thread.type !== threadTypes.GENESIS_PERSONAL || !thread.members.find(member => member.id === viewerID) ) { continue; } const otherMemberID = getSingleOtherUser(thread, viewerID); if (otherMemberID) { personalThreadMembers.add(otherMemberID); } } return personalThreadMembers; }, ); const savedEmojiAvatarSelectorForCurrentUser: ( state: BaseAppState<>, ) => () => ClientEmojiAvatar = createSelector( (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo, (currentUser: ?CurrentUserInfo) => { return () => { let userAvatar = getAvatarForUser(currentUser); if (userAvatar.type !== 'emoji') { userAvatar = getRandomDefaultEmojiAvatar(); } return userAvatar; }; }, ); const getRelativeUserIDs: (state: BaseAppState<>) => $ReadOnlyArray = createSelector( (state: BaseAppState<>) => state.userStore.userInfos, (userInfos: UserInfos): $ReadOnlyArray => Object.keys(userInfos), ); const usersWithMissingDeviceListSelector: ( state: BaseAppState<>, ) => $ReadOnlyArray = createSelector( getRelativeUserIDs, (state: BaseAppState<>) => state.auxUserStore.auxUserInfos, ( userIDs: $ReadOnlyArray, auxUserInfos: AuxUserInfos, ): $ReadOnlyArray => userIDs.filter( userID => (!auxUserInfos[userID] || !auxUserInfos[userID].deviceList) && userID !== bots.commbot.userID, ), ); // Foreign Peer Devices are all devices of users we are aware of, // but not our own devices. const getForeignPeerDeviceIDs: ( state: BaseAppState<>, ) => $ReadOnlyArray = createSelector( (state: BaseAppState<>) => state.auxUserStore.auxUserInfos, (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo.id, ( auxUserInfos: AuxUserInfos, currentUserID: ?string, ): $ReadOnlyArray => entries(auxUserInfos) .map(([userID, auxUserInfo]: [string, AuxUserInfo]) => userID !== currentUserID && auxUserInfo.deviceList?.devices ? auxUserInfo.deviceList.devices : [], ) .flat(), ); export type DeviceIDAndPlatformDetails = { +deviceID: string, +platformDetails: ?IdentityPlatformDetails, }; const getOwnPeerDevices: ( state: BaseAppState<>, ) => $ReadOnlyArray = createSelector( (state: BaseAppState<>) => state.auxUserStore.auxUserInfos, (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo.id, ( auxUserInfos: AuxUserInfos, currentUserID: ?string, ): $ReadOnlyArray => { if (!currentUserID) { return []; } const devices = auxUserInfos[currentUserID]?.deviceList?.devices; if (!devices) { return []; } return devices.map(deviceID => ({ deviceID, platformDetails: auxUserInfos[currentUserID].devicesPlatformDetails?.[deviceID], })); }, ); function getKeyserverDeviceID( devices: $ReadOnlyArray, ): ?string { const keyserverDevice = devices.find( device => device.platformDetails?.deviceType === identityDeviceTypes.KEYSERVER, ); return keyserverDevice ? keyserverDevice.deviceID : null; } const getAllPeerDevices: (state: BaseAppState<>) => $ReadOnlyArray = createSelector( (state: BaseAppState<>) => state.auxUserStore.auxUserInfos, (auxUserInfos: AuxUserInfos): $ReadOnlyArray => values(auxUserInfos) .map( (auxUserInfo: AuxUserInfo) => auxUserInfo.deviceList?.devices ?? [], ) .flat(), ); const getAllPeerUserIDAndDeviceIDs: ( state: BaseAppState<>, -) => $ReadOnlyArray<{ +userID: string, +deviceID: string }> = createSelector( +) => $ReadOnlyArray = createSelector( (state: BaseAppState<>) => state.auxUserStore.auxUserInfos, - ( - auxUserInfos: AuxUserInfos, - ): $ReadOnlyArray<{ +userID: string, +deviceID: string }> => + (auxUserInfos: AuxUserInfos): $ReadOnlyArray => entries(auxUserInfos).flatMap( ([userID, { deviceList }]: [string, AuxUserInfo]) => deviceList?.devices.map(deviceID => ({ userID, deviceID, })) ?? [], ), ); const getOwnPrimaryDeviceID: (state: BaseAppState<>) => ?string = createSelector( (state: BaseAppState<>) => state.auxUserStore.auxUserInfos, (state: BaseAppState<>) => state.currentUserInfo && state.currentUserInfo.id, (auxUserInfos: AuxUserInfos, currentUserID: ?string): ?string => currentUserID && auxUserInfos[currentUserID]?.deviceList?.devices[0], ); export { userIDsToRelativeUserInfos, getRelativeMemberInfos, relativeMemberInfoSelectorForMembersOfThread, userInfoSelectorForPotentialMembers, isLoggedIn, isLoggedInToKeyserver, usersWithPersonalThreadSelector, savedEmojiAvatarSelectorForCurrentUser, getRelativeUserIDs, usersWithMissingDeviceListSelector, getForeignPeerDeviceIDs, getOwnPeerDevices, getKeyserverDeviceID, getAllPeerDevices, getAllPeerUserIDAndDeviceIDs, getOwnPrimaryDeviceID, }; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index dbccd8da4..55175b5cd 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,456 +1,457 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { getAllPeerDevices } from '../selectors/user-selectors.js'; import { type AuthMetadata, IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; import type { NotificationsCreationData } from '../types/notif-types.js'; import { type OutboundP2PMessage } from '../types/sqlite-types.js'; import { getConfig } from '../utils/config.js'; import type { DeviceSessionCreationRequest } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { entries } from '../utils/objects.js'; import { ephemeralEncryptAndSendMessageToPeer, handleOutboundP2PMessage, type HandleOutboundP2PMessageResult, } from '../utils/peer-to-peer-communication-utils.js'; import { useSelector } from '../utils/redux-utils.js'; +export type P2PMessageRecipient = { +userID: string, +deviceID: string }; + type PeerToPeerContextType = { +processOutboundMessages: ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => void, +getDMOpsSendingPromise: () => { +promise: Promise, +dmOpID: string, }, +broadcastEphemeralMessage: ( contentPayload: string, - recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, + recipients: $ReadOnlyArray, authMetadata: AuthMetadata, ) => Promise, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; export type ProcessOutboundP2PMessagesResult = | { +result: 'success' } | { +result: 'failure', +failedMessageIDs: $ReadOnlyArray, }; async function createMissingSession( userDevicesWithoutSession: { [userID: string]: Array }, peerOlmSessionsCreator: ( userID: string, devices: $ReadOnlyArray, ) => Promise, ): Promise { const creatingSessionPromises = entries(userDevicesWithoutSession).map( async ([userID, devices]) => { try { await peerOlmSessionsCreator( userID, devices.map(deviceID => ({ deviceID })), ); } catch (e) { // Session creation may fail for some devices, // but we should still pursue delivery for others. console.log(e); } }, ); await Promise.all(creatingSessionPromises); } function processOutboundP2PMessagesResult( messageIDs: ?$ReadOnlyArray, sentMessagesMap: { [messageID: string]: boolean, }, ): ProcessOutboundP2PMessagesResult { const sentMessagesSet = new Set(Object.keys(sentMessagesMap)); const failedMessageIDs = messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? []; if (failedMessageIDs.length > 0) { return { result: 'failure', failedMessageIDs, }; } return { result: 'success', }; } async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, peerOlmSessionsCreator: ( userID: string, devices: $ReadOnlyArray, ) => Promise, messageIDs: ?$ReadOnlyArray, allPeerDevices: Set, ): Promise { let authMetadata; try { authMetadata = await identityContext.getAuthMetadata(); } catch (e) { return { result: 'failure', failedMessageIDs: messageIDs ?? [], }; } if ( !authMetadata.deviceID || !authMetadata.userID || !authMetadata.accessToken ) { return { result: 'failure', failedMessageIDs: messageIDs ?? [], }; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); const sentMessagesMap: { [messageID: string]: boolean } = {}; // 1. Retrieve messages to send. let messages; if (messageIDs) { messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); if (messageIDs.length !== messages.length) { const dbMessageIDsSet = new Set( messages.map(message => message.messageID), ); for (const messageID of messageIDs) { if (!dbMessageIDsSet.has(messageID)) { sentMessagesMap[messageID] = true; } } } } else { const allMessages = await sqliteAPI.getAllOutboundP2PMessages(); messages = allMessages.filter(message => message.supportsAutoRetry); } const messagesMap: { [messageID: string]: OutboundP2PMessage } = {}; // 2. Optimistically attempt to send all messages, and all should succeed, // the only exceptions are messages for devices we don't have a session // with or some other issues like network connection. const messagesPromises: Array> = []; for (const message: OutboundP2PMessage of messages) { messagesMap[message.messageID] = message; // If the message was addressed to a peer that no longer // exists we can remove it and return success. if (!allPeerDevices.has(message.deviceID)) { messagesPromises.push( (async () => { await sqliteAPI.removeOutboundP2PMessage( message.messageID, message.deviceID, ); return { status: 'success', messageID: message.messageID, }; })(), ); } else { messagesPromises.push( handleOutboundP2PMessage(message, authMetadata, sendMessage), ); } } const messagesResults: Array = await Promise.all(messagesPromises); // 3. Analyze results to retrieve all devices that need session creation // and map by userID. const userDevicesWithoutSession: { [userID: string]: Array } = {}; const messagesToRetry: Array = []; for (const result of messagesResults) { if (result.status === 'success') { sentMessagesMap[result.messageID] = true; } if (result.status === 'missing_session') { messagesToRetry.push(messagesMap[result.messageID]); const { userID, deviceID } = messagesMap[result.messageID]; if (userDevicesWithoutSession[userID]) { userDevicesWithoutSession[userID].push(deviceID); } else { userDevicesWithoutSession[userID] = [deviceID]; } } } if (messagesToRetry.length === 0) { return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap); } // 4. Create sessions with users who have at least one device // without a session. await createMissingSession(userDevicesWithoutSession, peerOlmSessionsCreator); // 5. Retry messages for which the session was missing. const retryPromises = messagesToRetry.map(message => handleOutboundP2PMessage(message, authMetadata, sendMessage), ); const retryResults: Array = await Promise.all(retryPromises); for (const result of retryResults) { if (result.status === 'success') { sentMessagesMap[result.messageID] = true; } } return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap); } const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; function PeerToPeerProvider(props: Props): React.Node { const { children } = props; const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const dmOpsSendingPromiseResolvers = React.useRef< Map< string, { +resolve: (result: ProcessOutboundP2PMessagesResult) => mixed, +reject: Error => mixed, }, >, >(new Map()); // This returns a promise that will be resolved with arrays of successfully // sent messages, so in case of failing all messages (e.g. no internet // connection) it will still resolve but with an empty array. const getDMOpsSendingPromise = React.useCallback(() => { const dmOpID = uuid.v4(); const promise = new Promise( (resolve, reject) => { dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); }, ); return { promise, dmOpID }; }, []); const processingQueue = React.useRef< Array<{ +outboundMessageIDs: ?$ReadOnlyArray, +dmOpID: ?string, +notificationsCreationData: ?NotificationsCreationData, +allPeerDevices: Set, }>, >([]); const promiseRunning = React.useRef(false); const { createOlmSessionsWithUser: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const sendPushNotifs = useSendPushNotifs(); const allPeerDevices = useSelector(getAllPeerDevices); const allPeerDevicesSet = React.useMemo( () => new Set(allPeerDevices), [allPeerDevices], ); const processOutboundMessages = React.useCallback( ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => { processingQueue.current.push({ outboundMessageIDs, dmOpID, notificationsCreationData, allPeerDevices: allPeerDevicesSet, }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { const queueFront = processingQueue.current.shift(); try { const [result] = await Promise.all([ processOutboundP2PMessages( sendMessageToDevice, identityContext, peerOlmSessionsCreator, queueFront?.outboundMessageIDs, queueFront.allPeerDevices, ), sendPushNotifs(queueFront.notificationsCreationData), ]); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.resolve?.(result); } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.reject?.(e); } } finally { if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); } } } while (processingQueue.current.length > 0); promiseRunning.current = false; })(); } }, [ allPeerDevicesSet, sendMessageToDevice, identityContext, peerOlmSessionsCreator, sendPushNotifs, ], ); const broadcastEphemeralMessage = React.useCallback( async ( contentPayload: string, - recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, + recipients: $ReadOnlyArray, authMetadata: AuthMetadata, ) => { const { userID: thisUserID, deviceID: thisDeviceID } = authMetadata; if (!thisDeviceID || !thisUserID) { throw new Error('No auth metadata'); } const { olmAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); // 1. Optimistically attempt to send all messages, and all should // succeed, the only exceptions are messages for devices we don't // have a session with or some other issues like network connection. const recipientPromises = recipients.map(async recipient => ephemeralEncryptAndSendMessageToPeer( contentPayload, recipient, authMetadata, sendMessageToDevice, ), ); const messagesResults = await Promise.all(recipientPromises); // 2. Analyze results to retrieve all devices that need session creation // and map by userID. const userDevicesWithoutSession: { [userID: string]: Array } = {}; - const recipientsToRetry: Array<{ +userID: string, +deviceID: string }> = - []; + const recipientsToRetry: Array = []; for (const result of messagesResults) { if (result.status === 'missing_session') { recipientsToRetry.push(result.recipient); const { userID, deviceID } = result.recipient; if (userDevicesWithoutSession[userID]) { userDevicesWithoutSession[userID].push(deviceID); } else { userDevicesWithoutSession[userID] = [deviceID]; } } } if (recipientsToRetry.length === 0) { return; } // 3.Create a session with users which has at // least one device without a session. await createMissingSession( userDevicesWithoutSession, peerOlmSessionsCreator, ); // 4. Retry recipients for which session was missing. const retryPromises = recipientsToRetry.map(async recipient => ephemeralEncryptAndSendMessageToPeer( contentPayload, recipient, authMetadata, sendMessageToDevice, ), ); await Promise.all(retryPromises); }, [peerOlmSessionsCreator, sendMessageToDevice], ); React.useEffect(() => { const intervalID = setInterval( processOutboundMessages, AUTOMATIC_RETRY_FREQUENCY, ); return () => clearInterval(intervalID); }, [processOutboundMessages]); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, getDMOpsSendingPromise, broadcastEphemeralMessage, }), [ broadcastEphemeralMessage, processOutboundMessages, getDMOpsSendingPromise, ], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication }; diff --git a/lib/utils/peer-to-peer-communication-utils.js b/lib/utils/peer-to-peer-communication-utils.js index f4f4d7fec..819c8c0e3 100644 --- a/lib/utils/peer-to-peer-communication-utils.js +++ b/lib/utils/peer-to-peer-communication-utils.js @@ -1,194 +1,195 @@ // @flow import { getConfig } from './config.js'; import { olmSessionErrors } from './olm-utils.js'; import { type AuthMetadata } from '../shared/identity-client-context.js'; +import { type P2PMessageRecipient } from '../tunnelbroker/peer-to-peer-context.js'; import type { TunnelbrokerClientMessageToDevice } from '../tunnelbroker/tunnelbroker-context.js'; import { outboundP2PMessageStatuses, type OutboundP2PMessage, } from '../types/sqlite-types.js'; import { peerToPeerMessageTypes, type EncryptedMessage, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; function getClientMessageIDFromTunnelbrokerMessageID( tunnelbrokerMessageID: string, ): string { const ids = tunnelbrokerMessageID.split('#'); if (ids.length !== 2) { throw new Error('Invalid tunnelbrokerMessageID'); } return ids[1]; } async function sendMessageToPeer( message: OutboundP2PMessage, authMetadata: ?AuthMetadata, sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, ): Promise<'success' | 'failure'> { const { sqliteAPI } = getConfig(); if (!authMetadata || !authMetadata.deviceID || !authMetadata.userID) { return 'failure'; } try { const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo: { deviceID: authMetadata.deviceID, userID: authMetadata.userID, }, encryptedData: JSON.parse(message.ciphertext), }; await sendMessage( { deviceID: message.deviceID, payload: JSON.stringify(encryptedMessage), }, message.messageID, ); await sqliteAPI.markOutboundP2PMessageAsSent( message.messageID, message.deviceID, ); return 'success'; } catch (e) { console.error(e); return 'failure'; } } async function encryptAndSendMessageToPeer( message: OutboundP2PMessage, authMetadata: ?AuthMetadata, sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, ): Promise<'success' | 'failure' | 'missing_session'> { const { olmAPI } = getConfig(); try { const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; return await sendMessageToPeer(encryptedMessage, authMetadata, sendMessage); } catch (e) { if (e.message?.includes(olmSessionErrors.sessionDoesNotExist)) { return 'missing_session'; } console.log(`Error sending messages to peer ${message.deviceID}`, e); return 'failure'; } } export type HandleOutboundP2PMessageResult = { +status: 'success' | 'failure' | 'missing_session', +messageID: string, }; async function handleOutboundP2PMessage( message: OutboundP2PMessage, authMetadata: ?AuthMetadata, sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, ): Promise { if (message.status === outboundP2PMessageStatuses.persisted) { const status = await encryptAndSendMessageToPeer( message, authMetadata, sendMessage, ); return { status, messageID: message.messageID, }; } else if (message.status === outboundP2PMessageStatuses.encrypted) { const status = await sendMessageToPeer(message, authMetadata, sendMessage); return { status, messageID: message.messageID, }; } else if (message.status === outboundP2PMessageStatuses.sent) { // Handle edge-case when message was sent, but it wasn't updated // in the message store. return { status: 'success', messageID: message.messageID, }; } return { status: 'failure', messageID: message.messageID, }; } export type EphemeralEncryptAndSendMessageToPeerResult = { +status: 'success' | 'failure' | 'missing_session', - +recipient: { +userID: string, +deviceID: string }, + +recipient: P2PMessageRecipient, }; async function ephemeralEncryptAndSendMessageToPeer( contentPayload: string, - recipient: { +userID: string, +deviceID: string }, + recipient: P2PMessageRecipient, authMetadata: ?AuthMetadata, sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, ): Promise { const { olmAPI } = getConfig(); if (!authMetadata || !authMetadata.deviceID || !authMetadata.userID) { return { status: 'failure', recipient }; } const senderInfo = { deviceID: authMetadata.deviceID, userID: authMetadata.userID, }; try { const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessage({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); return { status: 'success', recipient }; } catch (e) { if (e.message?.includes(olmSessionErrors.sessionDoesNotExist)) { return { status: 'missing_session', recipient }; } console.log(`Error sending messages to peer ${recipient.deviceID}`, e); return { status: 'failure', recipient }; } } export { getClientMessageIDFromTunnelbrokerMessageID, sendMessageToPeer, encryptAndSendMessageToPeer, ephemeralEncryptAndSendMessageToPeer, handleOutboundP2PMessage, };