diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -41,7 +41,10 @@ if (ops) { await processDBStoreOperations(ops); if (ops.outboundP2PMessages && ops.outboundP2PMessages.length > 0) { - processOutboundMessages(); + const messageIDs = ops.outboundP2PMessages.map( + message => message.messageID, + ); + processOutboundMessages(messageIDs); } } dispatch({ diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -24,7 +24,7 @@ import { getMessageForException } from '../utils/errors.js'; type PeerToPeerContextType = { - +processOutboundMessages: () => void, + +processOutboundMessages: (messageIDs?: $ReadOnlyArray) => void, }; const PeerToPeerContext: React.Context = @@ -40,6 +40,7 @@ messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, + messageIDs: ?$ReadOnlyArray, ): Promise { const authMetadata = await identityContext.getAuthMetadata(); if (!authMetadata.deviceID || !authMetadata.userID || !authMetadata.userID) { @@ -48,7 +49,12 @@ const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); - const messages = await sqliteAPI.getAllOutboundP2PMessage(); + let messages; + if (messageIDs) { + messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); + } else { + messages = await sqliteAPI.getAllOutboundP2PMessage(); + } const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; for (const message: OutboundP2PMessage of messages) { @@ -136,35 +142,41 @@ function PeerToPeerProvider(props: Props): React.Node { const { children } = props; - const restartPromise = React.useRef(false); + const processingQueue = React.useRef>>([]); const promiseRunning = React.useRef(false); const { sendMessage } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); - const processOutboundMessages = React.useCallback(() => { - if (!promiseRunning.current) { - promiseRunning.current = true; - void (async () => { - do { - restartPromise.current = false; - try { - await processOutboundP2PMessages(sendMessage, identityContext); - } catch (e) { - console.log( - `Error processing outbound P2P messages: ${ - getMessageForException(e) ?? 'unknown' - }`, - ); - } - } while (restartPromise.current); - promiseRunning.current = false; - })(); - } else { - restartPromise.current = true; - } - }, [identityContext, sendMessage]); + const processOutboundMessages = React.useCallback( + (messageIDs?: $ReadOnlyArray) => { + processingQueue.current.push(messageIDs); + if (!promiseRunning.current) { + promiseRunning.current = true; + void (async () => { + do { + const nextMessageIDs = processingQueue.current.shift(); + try { + await processOutboundP2PMessages( + sendMessage, + identityContext, + nextMessageIDs, + ); + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + } + } while (processingQueue.current.length > 0); + promiseRunning.current = false; + })(); + } + }, + [identityContext, sendMessage], + ); const value: PeerToPeerContextType = React.useMemo( () => ({