diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js index 6eba0272f..0eb4955b9 100644 --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -1,85 +1,88 @@ // @flow import * as React from 'react'; import { opsProcessingFinishedActionType } from '../actions/db-ops-actions.js'; import { usePeerToPeerCommunication } from '../tunnelbroker/peer-to-peer-context.js'; import { useTunnelbroker } from '../tunnelbroker/tunnelbroker-context.js'; import type { DBOpsEntry } from '../types/db-ops-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; import { type MessageProcessed, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { useDispatch, useSelector } from '../utils/redux-utils.js'; type Props = { +processDBStoreOperations: StoreOperations => Promise, }; function DBOpsHandler(props: Props): React.Node { const { sqliteAPI } = getConfig(); const { processDBStoreOperations } = props; const queueFront = useSelector(state => state.dbOpsStore.queuedOps[0]); const prevQueueFront = React.useRef(null); const { sendMessage } = useTunnelbroker(); const { processOutboundMessages } = usePeerToPeerCommunication(); const dispatch = useDispatch(); React.useEffect(() => { if (!queueFront || prevQueueFront.current === queueFront) { return; } prevQueueFront.current = queueFront; const { ops, messageSourceMetadata } = queueFront; void (async () => { if (ops) { await processDBStoreOperations(ops); if (ops.outboundP2PMessages && ops.outboundP2PMessages.length > 0) { - processOutboundMessages(); + const messageIDs = ops.outboundP2PMessages.map( + message => message.messageID, + ); + processOutboundMessages(messageIDs); } } dispatch({ type: opsProcessingFinishedActionType, }); if (messageSourceMetadata) { try { const { messageID, senderDeviceID } = messageSourceMetadata; const deviceID = await getContentSigningKey(); const message: MessageProcessed = { type: peerToPeerMessageTypes.MESSAGE_PROCESSED, messageID, deviceID, }; await sendMessage({ deviceID: senderDeviceID, payload: JSON.stringify(message), }); await sqliteAPI.removeInboundP2PMessages([messageID]); } catch (e) { console.log( `Error while sending confirmation: ${ getMessageForException(e) ?? 'unknown error' }`, ); } } })(); }, [ queueFront, dispatch, processDBStoreOperations, sendMessage, sqliteAPI, processOutboundMessages, ]); return null; } export { DBOpsHandler }; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index 504eda8b6..d02d6a5f1 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,190 +1,202 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; import { type OutboundP2PMessage, outboundP2PMessageStatuses, } from '../types/sqlite-types.js'; import { type EncryptedMessage, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { createOlmSessionWithPeer } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; type PeerToPeerContextType = { - +processOutboundMessages: () => void, + +processOutboundMessages: (messageIDs?: $ReadOnlyArray) => void, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, + messageIDs: ?$ReadOnlyArray, ): Promise { const authMetadata = await identityContext.getAuthMetadata(); if (!authMetadata.deviceID || !authMetadata.userID || !authMetadata.userID) { return; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); - const messages = await sqliteAPI.getAllOutboundP2PMessage(); + let messages; + if (messageIDs) { + messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); + } else { + messages = await sqliteAPI.getAllOutboundP2PMessage(); + } const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; for (const message: OutboundP2PMessage of messages) { if (!devicesMap[message.deviceID]) { devicesMap[message.deviceID] = [message]; } else { devicesMap[message.deviceID].push(message); } } const sendMessageToPeer = async ( message: OutboundP2PMessage, ): Promise => { if (!authMetadata.deviceID || !authMetadata.userID) { return; } const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo: { deviceID: authMetadata.deviceID, userID: authMetadata.userID, }, encryptedData: JSON.parse(message.ciphertext), }; await sendMessage( { deviceID: message.deviceID, payload: JSON.stringify(encryptedMessage), }, message.messageID, ); await sqliteAPI.markOutboundP2PMessageAsSent( message.messageID, message.deviceID, ); }; for (const peerDeviceID in devicesMap) { for (const message of devicesMap[peerDeviceID]) { if (message.status === outboundP2PMessageStatuses.persisted) { try { const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (e) { try { await createOlmSessionWithPeer( authMetadata, identityContext.identityClient, sendMessage, message.userID, peerDeviceID, ); const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (err) { console.log(`Error sending messages to peer ${peerDeviceID}`, err); break; } } } else if (message.status === outboundP2PMessageStatuses.encrypted) { await sendMessageToPeer(message); } } } } function PeerToPeerProvider(props: Props): React.Node { const { children } = props; - const restartPromise = React.useRef(false); + const processingQueue = React.useRef>>([]); const promiseRunning = React.useRef(false); const { sendMessage } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); - const processOutboundMessages = React.useCallback(() => { - if (!promiseRunning.current) { - promiseRunning.current = true; - void (async () => { - do { - restartPromise.current = false; - try { - await processOutboundP2PMessages(sendMessage, identityContext); - } catch (e) { - console.log( - `Error processing outbound P2P messages: ${ - getMessageForException(e) ?? 'unknown' - }`, - ); - } - } while (restartPromise.current); - promiseRunning.current = false; - })(); - } else { - restartPromise.current = true; - } - }, [identityContext, sendMessage]); + const processOutboundMessages = React.useCallback( + (messageIDs?: $ReadOnlyArray) => { + processingQueue.current.push(messageIDs); + if (!promiseRunning.current) { + promiseRunning.current = true; + void (async () => { + do { + const nextMessageIDs = processingQueue.current.shift(); + try { + await processOutboundP2PMessages( + sendMessage, + identityContext, + nextMessageIDs, + ); + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + } + } while (processingQueue.current.length > 0); + promiseRunning.current = false; + })(); + } + }, + [identityContext, sendMessage], + ); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, }), [processOutboundMessages], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication };