diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index 3c20d159f..c4a52c601 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,444 +1,426 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { type AuthMetadata, IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; import type { NotificationsCreationData } from '../types/notif-types.js'; -import { - type OutboundP2PMessage, - outboundP2PMessageStatuses, -} from '../types/sqlite-types.js'; -import { - type EncryptedMessage, - peerToPeerMessageTypes, -} from '../types/tunnelbroker/peer-to-peer-message-types.js'; +import { type OutboundP2PMessage } from '../types/sqlite-types.js'; import { getConfig } from '../utils/config.js'; import type { DeviceSessionCreationRequest } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { entries } from '../utils/objects.js'; -import { olmSessionErrors } from '../utils/olm-utils.js'; +import { + ephemeralEncryptAndSendMessageToPeer, + handleOutboundP2PMessage, + type HandleOutboundP2PMessageResult, +} from '../utils/peer-to-peer-communication-utils.js'; type PeerToPeerContextType = { +processOutboundMessages: ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => void, +getDMOpsSendingPromise: () => { +promise: Promise, +dmOpID: string, }, +broadcastEphemeralMessage: ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => Promise, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; export type ProcessOutboundP2PMessagesResult = | { +result: 'success' } | { +result: 'failure', +failedMessageIDs: $ReadOnlyArray, }; +async function createMissingSession( + userDevicesWithoutSession: { [userID: string]: Array }, + peerOlmSessionsCreator: ( + userID: string, + devices: $ReadOnlyArray, + ) => Promise, +): Promise { + const creatingSessionPromises = entries(userDevicesWithoutSession).map( + async ([userID, devices]) => { + try { + await peerOlmSessionsCreator( + userID, + devices.map(deviceID => ({ deviceID })), + ); + } catch (e) { + // Session creation may fail for some devices, + // but we should still pursue delivery for others. + console.log(e); + } + }, + ); + await Promise.all(creatingSessionPromises); +} + +function processOutboundP2PMessagesResult( + messageIDs: ?$ReadOnlyArray, + sentMessagesMap: { + [messageID: string]: boolean, + }, +): ProcessOutboundP2PMessagesResult { + const sentMessagesSet = new Set(Object.keys(sentMessagesMap)); + const failedMessageIDs = + messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? []; + if (failedMessageIDs.length > 0) { + return { + result: 'failure', + failedMessageIDs, + }; + } + return { + result: 'success', + }; +} + async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, peerOlmSessionsCreator: ( userID: string, devices: $ReadOnlyArray, ) => Promise, messageIDs: ?$ReadOnlyArray, ): Promise { let authMetadata; try { authMetadata = await identityContext.getAuthMetadata(); } catch (e) { return { result: 'failure', failedMessageIDs: messageIDs ?? [], }; } if ( !authMetadata.deviceID || !authMetadata.userID || !authMetadata.accessToken ) { return { result: 'failure', failedMessageIDs: messageIDs ?? [], }; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); const sentMessagesMap: { [messageID: string]: boolean } = {}; + // 1. Retrieve messages to send. let messages; if (messageIDs) { messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); if (messageIDs.length !== messages.length) { const dbMessageIDsSet = new Set( messages.map(message => message.messageID), ); for (const messageID of messageIDs) { if (!dbMessageIDsSet.has(messageID)) { sentMessagesMap[messageID] = true; } } } } else { const allMessages = await sqliteAPI.getAllOutboundP2PMessages(); messages = allMessages.filter(message => message.supportsAutoRetry); } - const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; + const messagesMap: { [messageID: string]: OutboundP2PMessage } = {}; + + // 2. Optimistically attempt to send all messages, and all should succeed, + // the only exceptions are messages for devices we don't have a session + // with or some other issues like network connection. + const messagesPromises: Array> = []; for (const message: OutboundP2PMessage of messages) { - if (!devicesMap[message.deviceID]) { - devicesMap[message.deviceID] = [message]; - } else { - devicesMap[message.deviceID].push(message); - } + messagesMap[message.messageID] = message; + messagesPromises.push( + handleOutboundP2PMessage(message, authMetadata, sendMessage), + ); } - - const sendMessageToPeer = async ( - message: OutboundP2PMessage, - ): Promise => { - if (!authMetadata.deviceID || !authMetadata.userID) { - return; + const messagesResults: Array = + await Promise.all(messagesPromises); + + // 3. Analyze results to retrieve all devices that need session creation + // and map by userID. + const userDevicesWithoutSession: { [userID: string]: Array } = {}; + const messagesToRetry: Array = []; + for (const result of messagesResults) { + if (result.status === 'success') { + sentMessagesMap[result.messageID] = true; } - try { - const encryptedMessage: EncryptedMessage = { - type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, - senderInfo: { - deviceID: authMetadata.deviceID, - userID: authMetadata.userID, - }, - encryptedData: JSON.parse(message.ciphertext), - }; - await sendMessage( - { - deviceID: message.deviceID, - payload: JSON.stringify(encryptedMessage), - }, - message.messageID, - ); - await sqliteAPI.markOutboundP2PMessageAsSent( - message.messageID, - message.deviceID, - ); - sentMessagesMap[message.messageID] = true; - } catch (e) { - console.error(e); + if (result.status === 'missing_session') { + messagesToRetry.push(messagesMap[result.messageID]); + const { userID, deviceID } = messagesMap[result.messageID]; + if (userDevicesWithoutSession[userID]) { + userDevicesWithoutSession[userID].push(deviceID); + } else { + userDevicesWithoutSession[userID] = [deviceID]; + } } - }; - - const devicePromises = entries(devicesMap).map( - async ([peerDeviceID, deviceMessages]) => { - for (const message of deviceMessages) { - if (message.status === outboundP2PMessageStatuses.persisted) { - try { - const result = await olmAPI.encryptAndPersist( - message.plaintext, - message.deviceID, - message.messageID, - ); - - const encryptedMessage: OutboundP2PMessage = { - ...message, - ciphertext: JSON.stringify(result), - }; - await sendMessageToPeer(encryptedMessage); - } catch (e) { - if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) { - console.log(`Error sending messages to peer ${peerDeviceID}`, e); - break; - } - try { - await peerOlmSessionsCreator(message.userID, [ - { deviceID: peerDeviceID }, - ]); - const result = await olmAPI.encryptAndPersist( - message.plaintext, - message.deviceID, - message.messageID, - ); - const encryptedMessage: OutboundP2PMessage = { - ...message, - ciphertext: JSON.stringify(result), - }; + } - await sendMessageToPeer(encryptedMessage); - } catch (err) { - console.log( - `Error sending messages to peer ${peerDeviceID}`, - err, - ); - break; - } - } - } else if (message.status === outboundP2PMessageStatuses.encrypted) { - await sendMessageToPeer(message); - } else if (message.status === outboundP2PMessageStatuses.sent) { - // Handle edge-case when message was sent, but it wasn't updated - // in the message store. - sentMessagesMap[message.messageID] = true; - } - } - }, - ); + if (messagesToRetry.length === 0) { + return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap); + } - await Promise.all(devicePromises); + // 4. Create sessions with users who have at least one device + // without a session. + await createMissingSession(userDevicesWithoutSession, peerOlmSessionsCreator); - const sentMessagesSet = new Set(Object.keys(sentMessagesMap)); - const failedMessageIDs = - messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? []; - if (failedMessageIDs.length > 0) { - return { - result: 'failure', - failedMessageIDs, - }; + // 5. Retry messages for which the session was missing. + const retryPromises = messagesToRetry.map(message => + handleOutboundP2PMessage(message, authMetadata, sendMessage), + ); + const retryResults: Array = + await Promise.all(retryPromises); + for (const result of retryResults) { + if (result.status === 'success') { + sentMessagesMap[result.messageID] = true; + } } - return { - result: 'success', - }; + + return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap); } const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; function PeerToPeerProvider(props: Props): React.Node { const { children } = props; const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const dmOpsSendingPromiseResolvers = React.useRef< Map< string, { +resolve: (result: ProcessOutboundP2PMessagesResult) => mixed, +reject: Error => mixed, }, >, >(new Map()); // This returns a promise that will be resolved with arrays of successfully // sent messages, so in case of failing all messages (e.g. no internet // connection) it will still resolve but with an empty array. const getDMOpsSendingPromise = React.useCallback(() => { const dmOpID = uuid.v4(); const promise = new Promise( (resolve, reject) => { dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); }, ); return { promise, dmOpID }; }, []); const processingQueue = React.useRef< Array<{ +outboundMessageIDs: ?$ReadOnlyArray, +dmOpID: ?string, +notificationsCreationData: ?NotificationsCreationData, }>, >([]); const promiseRunning = React.useRef(false); const { createOlmSessionsWithUser: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const sendPushNotifs = useSendPushNotifs(); const processOutboundMessages = React.useCallback( ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => { processingQueue.current.push({ outboundMessageIDs, dmOpID, notificationsCreationData, }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { const queueFront = processingQueue.current.shift(); try { const [result] = await Promise.all([ processOutboundP2PMessages( sendMessageToDevice, identityContext, peerOlmSessionsCreator, queueFront?.outboundMessageIDs, ), sendPushNotifs(queueFront.notificationsCreationData), ]); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.resolve?.(result); } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.reject?.(e); } } finally { if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); } } } while (processingQueue.current.length > 0); promiseRunning.current = false; })(); } }, [ sendPushNotifs, peerOlmSessionsCreator, identityContext, sendMessageToDevice, ], ); const broadcastEphemeralMessage = React.useCallback( async ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => { const { userID: thisUserID, deviceID: thisDeviceID } = authMetadata; if (!thisDeviceID || !thisUserID) { throw new Error('No auth metadata'); } const { olmAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); - // We want it distinct by device ID to avoid potentially creating - // multiple Olm sessions with the same device simultaneously. - const recipientsDistinctByDeviceID = [ - ...new Map(recipients.map(item => [item.deviceID, item])).values(), - ]; - const senderInfo = { deviceID: thisDeviceID, userID: thisUserID }; - const promises = recipientsDistinctByDeviceID.map(async recipient => { - try { - const encryptedData = await olmAPI.encrypt( - contentPayload, - recipient.deviceID, - ); - const encryptedMessage: EncryptedMessage = { - type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, - senderInfo, - encryptedData, - }; - await sendMessageToDevice({ - deviceID: recipient.deviceID, - payload: JSON.stringify(encryptedMessage), - }); - } catch (e) { - if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) { - console.log( - `Error sending messages to peer ${recipient.deviceID}`, - e, - ); - return; - } - try { - await peerOlmSessionsCreator(recipient.userID, [ - { deviceID: recipient.deviceID }, - ]); - const encryptedData = await olmAPI.encrypt( - contentPayload, - recipient.deviceID, - ); - const encryptedMessage: EncryptedMessage = { - type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, - senderInfo, - encryptedData, - }; - await sendMessageToDevice({ - deviceID: recipient.deviceID, - payload: JSON.stringify(encryptedMessage), - }); - } catch (err) { - console.warn( - `Error sending Olm-encrypted message to device ${recipient.deviceID}:`, - err, - ); + // 1. Optimistically attempt to send all messages, and all should + // succeed, the only exceptions are messages for devices we don't + // have a session with or some other issues like network connection. + const recipientPromises = recipients.map(async recipient => + ephemeralEncryptAndSendMessageToPeer( + contentPayload, + recipient, + authMetadata, + sendMessageToDevice, + ), + ); + const messagesResults = await Promise.all(recipientPromises); + + // 2. Analyze results to retrieve all devices that need session creation + // and map by userID. + const userDevicesWithoutSession: { [userID: string]: Array } = {}; + const recipientsToRetry: Array<{ +userID: string, +deviceID: string }> = + []; + for (const result of messagesResults) { + if (result.status === 'missing_session') { + recipientsToRetry.push(result.recipient); + const { userID, deviceID } = result.recipient; + if (userDevicesWithoutSession[userID]) { + userDevicesWithoutSession[userID].push(deviceID); + } else { + userDevicesWithoutSession[userID] = [deviceID]; } } - }); - await Promise.all(promises); + } + if (recipientsToRetry.length === 0) { + return; + } + + // 3.Create a session with users which has at + // least one device without a session. + await createMissingSession( + userDevicesWithoutSession, + peerOlmSessionsCreator, + ); + + // 4. Retry recipients for which session was missing. + const retryPromises = recipientsToRetry.map(async recipient => + ephemeralEncryptAndSendMessageToPeer( + contentPayload, + recipient, + authMetadata, + sendMessageToDevice, + ), + ); + await Promise.all(retryPromises); }, [peerOlmSessionsCreator, sendMessageToDevice], ); React.useEffect(() => { const intervalID = setInterval( processOutboundMessages, AUTOMATIC_RETRY_FREQUENCY, ); return () => clearInterval(intervalID); }, [processOutboundMessages]); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, getDMOpsSendingPromise, broadcastEphemeralMessage, }), [ broadcastEphemeralMessage, processOutboundMessages, getDMOpsSendingPromise, ], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication };