diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index fbe90e71b..1feba3db2 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,463 +1,450 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; +import { useActionsQueue } from '../hooks/actions-queue.js'; import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { getAllPeerDevices } from '../selectors/user-selectors.js'; import { type AuthMetadata, IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; import type { NotificationsCreationData } from '../types/notif-types.js'; import { type OutboundP2PMessage } from '../types/sqlite-types.js'; import { getConfig } from '../utils/config.js'; import type { DeviceSessionCreationRequest } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { entries } from '../utils/objects.js'; import { ephemeralEncryptAndSendMessageToPeer, handleOutboundP2PMessage, type HandleOutboundP2PMessageResult, } from '../utils/peer-to-peer-communication-utils.js'; import { useSelector } from '../utils/redux-utils.js'; export type P2PMessageRecipient = { +userID: string, +deviceID: string }; type PeerToPeerContextType = { +processOutboundMessages: ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => void, +getDMOpsSendingPromise: () => { +promise: Promise, +dmOpID: string, }, +broadcastEphemeralMessage: ( contentPayload: string, recipients: $ReadOnlyArray, authMetadata: AuthMetadata, ) => Promise, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; export type ProcessOutboundP2PMessagesResult = | { +result: 'success' } | { +result: 'failure', +failedMessageIDs: $ReadOnlyArray, }; async function createMissingSession( userDevicesWithoutSession: { [userID: string]: Array }, peerOlmSessionsCreator: ( userID: string, devices: $ReadOnlyArray, ) => Promise, ): Promise { const creatingSessionPromises = entries(userDevicesWithoutSession).map( async ([userID, devices]) => { try { await peerOlmSessionsCreator( userID, devices.map(deviceID => ({ deviceID })), ); } catch (e) { // Session creation may fail for some devices, // but we should still pursue delivery for others. console.log(e); } }, ); await Promise.all(creatingSessionPromises); } function processOutboundP2PMessagesResult( messageIDs: ?$ReadOnlyArray, sentMessagesMap: { [messageID: string]: boolean, }, ): ProcessOutboundP2PMessagesResult { const sentMessagesSet = new Set(Object.keys(sentMessagesMap)); const failedMessageIDs = messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? []; if (failedMessageIDs.length > 0) { return { result: 'failure', failedMessageIDs, }; } return { result: 'success', }; } async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, peerOlmSessionsCreator: ( userID: string, devices: $ReadOnlyArray, ) => Promise, messageIDs: ?$ReadOnlyArray, - allPeerDevices: Set, + allPeerDevices: $ReadOnlySet, ): Promise { let authMetadata; try { authMetadata = await identityContext.getAuthMetadata(); } catch (e) { return { result: 'failure', failedMessageIDs: messageIDs ?? [], }; } if ( !authMetadata.deviceID || !authMetadata.userID || !authMetadata.accessToken ) { return { result: 'failure', failedMessageIDs: messageIDs ?? [], }; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); const sentMessagesMap: { [messageID: string]: boolean } = {}; // 1. Retrieve messages to send. let messages; if (messageIDs) { messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); if (messageIDs.length !== messages.length) { const dbMessageIDsSet = new Set( messages.map(message => message.messageID), ); for (const messageID of messageIDs) { if (!dbMessageIDsSet.has(messageID)) { sentMessagesMap[messageID] = true; } } } } else { const allMessages = await sqliteAPI.getAllOutboundP2PMessages(); messages = allMessages.filter(message => message.supportsAutoRetry); } const messagesMap: { [messageID: string]: OutboundP2PMessage } = {}; // 2. Optimistically attempt to send all messages, and all should succeed, // the only exceptions are messages for devices we don't have a session // with or some other issues like network connection. const messagesPromises: Array> = []; for (const message: OutboundP2PMessage of messages) { messagesMap[message.messageID] = message; // If the message was addressed to a peer that no longer // exists we can remove it and return success. if (!allPeerDevices.has(message.deviceID)) { messagesPromises.push( (async () => { await sqliteAPI.removeOutboundP2PMessage( message.messageID, message.deviceID, ); return { status: 'success', messageID: message.messageID, }; })(), ); } else { messagesPromises.push( handleOutboundP2PMessage(message, authMetadata, sendMessage), ); } } const messagesResults: Array = await Promise.all(messagesPromises); // 3. Analyze results to retrieve all devices that need session creation // and map by userID. const userDevicesWithoutSession: { [userID: string]: Array } = {}; const messagesToRetry: Array = []; for (const result of messagesResults) { if (result.status === 'success') { sentMessagesMap[result.messageID] = true; } if (result.status === 'missing_session') { messagesToRetry.push(messagesMap[result.messageID]); const { userID, deviceID } = messagesMap[result.messageID]; if (userDevicesWithoutSession[userID]) { userDevicesWithoutSession[userID].push(deviceID); } else { userDevicesWithoutSession[userID] = [deviceID]; } } } if (messagesToRetry.length === 0) { return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap); } // 4. Create sessions with users who have at least one device // without a session. await createMissingSession(userDevicesWithoutSession, peerOlmSessionsCreator); // 5. Retry messages for which the session was missing. const retryPromises = messagesToRetry.map(message => handleOutboundP2PMessage(message, authMetadata, sendMessage), ); const retryResults: Array = await Promise.all(retryPromises); for (const result of retryResults) { if (result.status === 'success') { sentMessagesMap[result.messageID] = true; } } return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap); } const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; +type QueuedMessage = { + +outboundMessageIDs: ?$ReadOnlyArray, + +dmOpID: ?string, + +notificationsCreationData: ?NotificationsCreationData, + +allPeerDevices: $ReadOnlySet, +}; + function PeerToPeerProvider(props: Props): React.Node { const { children } = props; const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const dmOpsSendingPromiseResolvers = React.useRef< Map< string, { +resolve: (result: ProcessOutboundP2PMessagesResult) => mixed, +reject: Error => mixed, }, >, >(new Map()); // This returns a promise that will be resolved with arrays of successfully // sent messages, so in case of failing all messages (e.g. no internet // connection) it will still resolve but with an empty array. const getDMOpsSendingPromise = React.useCallback(() => { const dmOpID = uuid.v4(); const promise = new Promise( (resolve, reject) => { dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); }, ); return { promise, dmOpID }; }, []); - const processingQueue = React.useRef< - Array<{ - +outboundMessageIDs: ?$ReadOnlyArray, - +dmOpID: ?string, - +notificationsCreationData: ?NotificationsCreationData, - +allPeerDevices: Set, - }>, - >([]); - const promiseRunning = React.useRef(false); - const { createOlmSessionsWithUser: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const sendPushNotifs = useSendPushNotifs(); const allPeerDevices = useSelector(getAllPeerDevices); const allPeerDevicesSet = React.useMemo( () => new Set(allPeerDevices), [allPeerDevices], ); - const processOutboundMessagesLatestVersionRef = React.useRef(-1); + const processItem = React.useCallback( + async (message: QueuedMessage) => { + const { dmOpID } = message; + try { + const [result] = await Promise.all([ + processOutboundP2PMessages( + sendMessageToDevice, + identityContext, + peerOlmSessionsCreator, + message.outboundMessageIDs, + message.allPeerDevices, + ), + sendPushNotifs(message.notificationsCreationData), + ]); + if (dmOpID) { + dmOpsSendingPromiseResolvers.current.get(dmOpID)?.resolve?.(result); + } + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + if (dmOpID) { + dmOpsSendingPromiseResolvers.current.get(dmOpID)?.reject?.(e); + } + } finally { + if (dmOpID) { + dmOpsSendingPromiseResolvers.current.delete(dmOpID); + } + } + }, + [ + identityContext, + peerOlmSessionsCreator, + sendMessageToDevice, + sendPushNotifs, + ], + ); + const { enqueue } = useActionsQueue(processItem); + const processOutboundMessages = React.useMemo(() => { - const currentVersion = ++processOutboundMessagesLatestVersionRef.current; return ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => { - processingQueue.current.push({ - outboundMessageIDs, - dmOpID, - notificationsCreationData, - allPeerDevices: allPeerDevicesSet, - }); - if (!promiseRunning.current) { - promiseRunning.current = true; - void (async () => { - do { - if ( - currentVersion !== processOutboundMessagesLatestVersionRef.current - ) { - break; - } - const queueFront = processingQueue.current.shift(); - try { - const [result] = await Promise.all([ - processOutboundP2PMessages( - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - queueFront?.outboundMessageIDs, - queueFront.allPeerDevices, - ), - sendPushNotifs(queueFront.notificationsCreationData), - ]); - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current - .get(queueFront.dmOpID) - ?.resolve?.(result); - } - } catch (e) { - console.log( - `Error processing outbound P2P messages: ${ - getMessageForException(e) ?? 'unknown' - }`, - ); - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current - .get(queueFront.dmOpID) - ?.reject?.(e); - } - } finally { - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); - } - } - } while (processingQueue.current.length > 0); - promiseRunning.current = false; - })(); - } + enqueue([ + { + outboundMessageIDs, + dmOpID, + notificationsCreationData, + allPeerDevices: allPeerDevicesSet, + }, + ]); }; - }, [ - allPeerDevicesSet, - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - sendPushNotifs, - ]); + }, [enqueue, allPeerDevicesSet]); const broadcastEphemeralMessage = React.useCallback( async ( contentPayload: string, recipients: $ReadOnlyArray, authMetadata: AuthMetadata, ) => { const { userID: thisUserID, deviceID: thisDeviceID } = authMetadata; if (!thisDeviceID || !thisUserID) { throw new Error('No auth metadata'); } const { olmAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); // 1. Optimistically attempt to send all messages, and all should // succeed, the only exceptions are messages for devices we don't // have a session with or some other issues like network connection. const recipientPromises = recipients.map(async recipient => ephemeralEncryptAndSendMessageToPeer( contentPayload, recipient, authMetadata, sendMessageToDevice, ), ); const messagesResults = await Promise.all(recipientPromises); // 2. Analyze results to retrieve all devices that need session creation // and map by userID. const userDevicesWithoutSession: { [userID: string]: Array } = {}; const recipientsToRetry: Array = []; for (const result of messagesResults) { if (result.status === 'missing_session') { recipientsToRetry.push(result.recipient); const { userID, deviceID } = result.recipient; if (userDevicesWithoutSession[userID]) { userDevicesWithoutSession[userID].push(deviceID); } else { userDevicesWithoutSession[userID] = [deviceID]; } } } if (recipientsToRetry.length === 0) { return; } // 3.Create a session with users which has at // least one device without a session. await createMissingSession( userDevicesWithoutSession, peerOlmSessionsCreator, ); // 4. Retry recipients for which session was missing. const retryPromises = recipientsToRetry.map(async recipient => ephemeralEncryptAndSendMessageToPeer( contentPayload, recipient, authMetadata, sendMessageToDevice, ), ); await Promise.all(retryPromises); }, [peerOlmSessionsCreator, sendMessageToDevice], ); React.useEffect(() => { const intervalID = setInterval( processOutboundMessages, AUTOMATIC_RETRY_FREQUENCY, ); return () => clearInterval(intervalID); }, [processOutboundMessages]); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, getDMOpsSendingPromise, broadcastEphemeralMessage, }), [ broadcastEphemeralMessage, processOutboundMessages, getDMOpsSendingPromise, ], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication };