diff --git a/lib/hooks/actions-queue.js b/lib/hooks/actions-queue.js --- a/lib/hooks/actions-queue.js +++ b/lib/hooks/actions-queue.js @@ -7,28 +7,31 @@ }; function useActionsQueue( - performAction: (item: T) => mixed | Promise, + performAction: ( + item: T, + enqueue: (items: $ReadOnlyArray) => mixed, + ) => mixed | Promise, ): MessageQueueHook { const [queue, setQueue] = React.useState<$ReadOnlyArray>([]); const isProcessing = React.useRef(false); + const enqueue = React.useCallback( + (items: $ReadOnlyArray) => + setQueue(prevQueue => [...prevQueue, ...items]), + [], + ); + const process = React.useCallback( async (action: T) => { isProcessing.current = true; try { - await performAction(action); + await performAction(action, enqueue); } finally { isProcessing.current = false; setQueue(currentQueue => currentQueue.slice(1)); } }, - [performAction], - ); - - const enqueue = React.useCallback( - (items: $ReadOnlyArray) => - setQueue(prevQueue => [...prevQueue, ...items]), - [], + [enqueue, performAction], ); React.useEffect(() => { diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -9,6 +9,7 @@ useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; +import { useActionsQueue } from '../hooks/actions-queue.js'; import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { getAllPeerDevices } from '../selectors/user-selectors.js'; import { @@ -240,6 +241,28 @@ const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; +type QueuedMessage = { + +outboundMessageIDs: ?$ReadOnlyArray, + +dmOpID: ?string, + +notificationsCreationData: ?NotificationsCreationData, + +allPeerDevices: Set, +}; +type QueueItem = + | { + +type: 'process_outbound_messages', + +message: QueuedMessage, + } + | { + +type: 'send_notifs', + +message: QueuedMessage, + +processingResult: ProcessOutboundP2PMessagesResult, + } + | { + +type: 'resolve', + +message: QueuedMessage, + +result: ProcessOutboundP2PMessagesResult, + }; + function PeerToPeerProvider(props: Props): React.Node { const { children } = props; @@ -271,16 +294,6 @@ return { promise, dmOpID }; }, []); - const processingQueue = React.useRef< - Array<{ - +outboundMessageIDs: ?$ReadOnlyArray, - +dmOpID: ?string, - +notificationsCreationData: ?NotificationsCreationData, - +allPeerDevices: Set, - }>, - >([]); - const promiseRunning = React.useRef(false); - const { createOlmSessionsWithUser: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const sendPushNotifs = useSendPushNotifs(); @@ -290,74 +303,82 @@ [allPeerDevices], ); - const processOutboundMessagesLatestVersionRef = React.useRef(-1); + const processItem = React.useCallback( + async (item: QueueItem, enqueue: ($ReadOnlyArray) => mixed) => { + const { dmOpID } = item.message; + try { + if (item.type === 'process_outbound_messages') { + const result = await processOutboundP2PMessages( + sendMessageToDevice, + identityContext, + peerOlmSessionsCreator, + item.message.outboundMessageIDs, + item.message.allPeerDevices, + ); + enqueue([ + { + type: 'send_notifs', + message: item.message, + processingResult: result, + }, + ]); + } else if (item.type === 'send_notifs') { + await sendPushNotifs(item.message.notificationsCreationData); + enqueue([ + { + type: 'resolve', + message: item.message, + result: item.processingResult, + }, + ]); + } else { + if (dmOpID) { + dmOpsSendingPromiseResolvers.current + .get(dmOpID) + ?.resolve?.(item.result); + dmOpsSendingPromiseResolvers.current.delete(dmOpID); + } + } + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + if (dmOpID) { + dmOpsSendingPromiseResolvers.current.get(dmOpID)?.reject?.(e); + dmOpsSendingPromiseResolvers.current.delete(dmOpID); + } + } + }, + [ + identityContext, + peerOlmSessionsCreator, + sendMessageToDevice, + sendPushNotifs, + ], + ); + const { enqueue } = useActionsQueue(processItem); + const processOutboundMessages = React.useMemo(() => { - const currentVersion = ++processOutboundMessagesLatestVersionRef.current; return ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => { - processingQueue.current.push({ - outboundMessageIDs, - dmOpID, - notificationsCreationData, - allPeerDevices: allPeerDevicesSet, - }); - if (!promiseRunning.current) { - promiseRunning.current = true; - void (async () => { - do { - if ( - currentVersion !== processOutboundMessagesLatestVersionRef.current - ) { - break; - } - const queueFront = processingQueue.current.shift(); - try { - const [result] = await Promise.all([ - processOutboundP2PMessages( - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - queueFront?.outboundMessageIDs, - queueFront.allPeerDevices, - ), - sendPushNotifs(queueFront.notificationsCreationData), - ]); - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current - .get(queueFront.dmOpID) - ?.resolve?.(result); - } - } catch (e) { - console.log( - `Error processing outbound P2P messages: ${ - getMessageForException(e) ?? 'unknown' - }`, - ); - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current - .get(queueFront.dmOpID) - ?.reject?.(e); - } - } finally { - if (queueFront.dmOpID) { - dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); - } - } - } while (processingQueue.current.length > 0); - promiseRunning.current = false; - })(); - } + enqueue([ + { + type: 'process_outbound_messages', + message: { + outboundMessageIDs, + dmOpID, + notificationsCreationData, + allPeerDevices: allPeerDevicesSet, + }, + }, + ]); }; - }, [ - allPeerDevicesSet, - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - sendPushNotifs, - ]); + }, [enqueue, allPeerDevicesSet]); const broadcastEphemeralMessage = React.useCallback( async (