diff --git a/lib/hooks/actions-queue.js b/lib/hooks/actions-queue.js --- a/lib/hooks/actions-queue.js +++ b/lib/hooks/actions-queue.js @@ -12,6 +12,12 @@ const [queue, setQueue] = React.useState<$ReadOnlyArray>([]); const isProcessing = React.useRef(false); + const enqueue = React.useCallback( + (items: $ReadOnlyArray) => + setQueue(prevQueue => [...prevQueue, ...items]), + [], + ); + const process = React.useCallback( async (action: T) => { isProcessing.current = true; @@ -25,12 +31,6 @@ [performAction], ); - const enqueue = React.useCallback( - (items: $ReadOnlyArray) => - setQueue(prevQueue => [...prevQueue, ...items]), - [], - ); - React.useEffect(() => { if (isProcessing.current || queue.length === 0) { return; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -9,6 +9,7 @@ useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; +import { useActionsQueue } from '../hooks/actions-queue.js'; import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { getAllPeerDevices } from '../selectors/user-selectors.js'; import { @@ -240,6 +241,13 @@ const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; +type QueuedMessage = { + +outboundMessageIDs: ?$ReadOnlyArray, + +dmOpID: ?string, + +notificationsCreationData: ?NotificationsCreationData, + +allPeerDevices: Set, +}; + function PeerToPeerProvider(props: Props): React.Node { const { children } = props; @@ -271,16 +279,6 @@ return { promise, dmOpID }; }, []); - const processingQueue = React.useRef< - Array<{ - +outboundMessageIDs: ?$ReadOnlyArray, - +dmOpID: ?string, - +notificationsCreationData: ?NotificationsCreationData, - +allPeerDevices: Set, - }>, - >([]); - const promiseRunning = React.useRef(false); - const { createOlmSessionsWithUser: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const sendPushNotifs = useSendPushNotifs(); @@ -290,74 +288,61 @@ [allPeerDevices], ); - const processOutboundMessagesLatestVersionRef = React.useRef(-1); + const processItem = React.useCallback( + async (message: QueuedMessage) => { + const { dmOpID } = message; + try { + const [result] = await Promise.all([ + processOutboundP2PMessages( + sendMessageToDevice, + identityContext, + peerOlmSessionsCreator, + message.outboundMessageIDs, + message.allPeerDevices, + ), + sendPushNotifs(message.notificationsCreationData), + ]); + if (dmOpID) { + dmOpsSendingPromiseResolvers.current.get(dmOpID)?.resolve?.(result); + dmOpsSendingPromiseResolvers.current.delete(dmOpID); + } + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + if (dmOpID) { + dmOpsSendingPromiseResolvers.current.get(dmOpID)?.reject?.(e); + dmOpsSendingPromiseResolvers.current.delete(dmOpID); + } + } + }, + [ + identityContext, + peerOlmSessionsCreator, + sendMessageToDevice, + sendPushNotifs, + ], + ); + const { enqueue } = useActionsQueue(processItem); + const processOutboundMessages = React.useMemo(() => { - const currentVersion = ++processOutboundMessagesLatestVersionRef.current; return ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => { - processingQueue.current.push({ - outboundMessageIDs, - dmOpID, - notificationsCreationData, - allPeerDevices: allPeerDevicesSet, - }); - if (!promiseRunning.current) { - promiseRunning.current = true; - void (async () => { - do { - if ( - currentVersion !== processOutboundMessagesLatestVersionRef.current - ) { - break; - } - const queueFront = processingQueue.current.shift(); - try { - const [result] = await Promise.all([ - processOutboundP2PMessages( - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - queueFront?.outboundMessageIDs, - queueFront.allPeerDevices, - ), - sendPushNotifs(queueFront.notificationsCreationData), - ]); - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current - .get(queueFront.dmOpID) - ?.resolve?.(result); - } - } catch (e) { - console.log( - `Error processing outbound P2P messages: ${ - getMessageForException(e) ?? 'unknown' - }`, - ); - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current - .get(queueFront.dmOpID) - ?.reject?.(e); - } - } finally { - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); - } - } - } while (processingQueue.current.length > 0); - promiseRunning.current = false; - })(); - } + enqueue([ + { + outboundMessageIDs, + dmOpID, + notificationsCreationData, + allPeerDevices: allPeerDevicesSet, + }, + ]); }; - }, [ - allPeerDevicesSet, - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - sendPushNotifs, - ]); + }, [enqueue, allPeerDevicesSet]); const broadcastEphemeralMessage = React.useCallback( async (