diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js --- a/lib/tunnelbroker/peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/peer-to-peer-message-handler.js @@ -8,6 +8,7 @@ usePeerToPeerMessageHandler, } from './use-peer-to-peer-message-handler.js'; import { useLoggedInUserInfo } from '../hooks/account-hooks.js'; +import { useActionsQueue } from '../hooks/actions-queue.js'; import type { InboundP2PMessage } from '../types/sqlite-types.js'; import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import { @@ -88,17 +89,20 @@ [getSessionCounter, socketSend], ); - const processPersistedInboundMessages = React.useCallback(async () => { - if (isProcessing || processedInboundMessages) { - return; - } - setProcessing(true); - - try { - const { sqliteAPI } = getConfig(); - const messages = await sqliteAPI.getAllInboundP2PMessages(); - - for (const message: InboundP2PMessage of messages) { + const processItem = React.useCallback( + async ( + item: + | { + +type: 'message', + +message: InboundP2PMessage, + } + | { + +type: 'function', + +itemFunction: () => mixed, + }, + ) => { + if (item.type === 'message') { + const { message } = item; try { await handleOlmMessageToDevice( message.plaintext, @@ -108,12 +112,41 @@ } catch (e) { console.log('Failed processing Olm P2P message:', e); } + } else { + item.itemFunction(); } + }, + [handleOlmMessageToDevice], + ); + const { enqueue } = useActionsQueue(processItem); + + const processPersistedInboundMessages = React.useCallback(async () => { + if (isProcessing || processedInboundMessages) { + return; + } + setProcessing(true); + + try { + const { sqliteAPI } = getConfig(); + const messages = await sqliteAPI.getAllInboundP2PMessages(); + enqueue( + messages.map(message => ({ + type: 'message', + message, + })), + ); } finally { - setProcessedInboundMessages(true); - setProcessing(false); + enqueue([ + { + type: 'function', + itemFunction: () => { + setProcessedInboundMessages(true); + setProcessing(false); + }, + }, + ]); } - }, [handleOlmMessageToDevice, isProcessing, processedInboundMessages]); + }, [enqueue, isProcessing, processedInboundMessages]); const processMessage = React.useCallback(async () => { if (messagesQueue.length === 0 || isProcessing) {