diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js --- a/lib/tunnelbroker/peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/peer-to-peer-message-handler.js @@ -3,7 +3,11 @@ import * as React from 'react'; import { useTunnelbroker } from './tunnelbroker-context.js'; -import { usePeerToPeerMessageHandler } from './use-peer-to-peer-message-handler.js'; +import { + useHandleOlmMessageToDevice, + usePeerToPeerMessageHandler, +} from './use-peer-to-peer-message-handler.js'; +import type { InboundP2PMessage } from '../types/sqlite-types.js'; import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import { deviceToTunnelbrokerMessageTypes, @@ -14,6 +18,7 @@ peerToPeerMessageValidator, type PeerToPeerMessage, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; +import { getConfig } from '../utils/config.js'; type Props = { +socketSend: (message: string) => void, @@ -25,6 +30,7 @@ const { addListener, removeListener } = useTunnelbroker(); const peerToPeerMessageHandler = usePeerToPeerMessageHandler(); + const handleOlmMessageToDevice = useHandleOlmMessageToDevice(); const [messagesQueue, setMessagesQueue] = React.useState< $ReadOnlyArray<{ @@ -34,6 +40,8 @@ }>, >([]); const [isProcessing, setProcessing] = React.useState(false); + const [processedInboundMessages, setProcessedInboundMessages] = + React.useState(false); const tunnelbrokerMessageListener = React.useCallback( async (message: TunnelbrokerToDeviceMessage) => { @@ -79,6 +87,33 @@ [getSessionCounter, socketSend], ); + const processPersistedInboundMessages = React.useCallback(async () => { + if (isProcessing || processedInboundMessages) { + return; + } + setProcessing(true); + + try { + const { sqliteAPI } = getConfig(); + const messages = await sqliteAPI.getAllInboundP2PMessages(); + + for (const message: InboundP2PMessage of messages) { + try { + await handleOlmMessageToDevice( + message.plaintext, + { deviceID: message.senderDeviceID, userID: message.senderUserID }, + message.messageID, + ); + } catch (e) { + console.log('Failed processing Olm P2P message:', e); + } + } + } finally { + setProcessedInboundMessages(true); + setProcessing(false); + } + }, [handleOlmMessageToDevice, isProcessing, processedInboundMessages]); + const processMessage = React.useCallback(async () => { if (messagesQueue.length === 0 || isProcessing) { return; @@ -130,10 +165,21 @@ ]); React.useEffect(() => { - if (!isProcessing && messagesQueue.length > 0) { + if (isProcessing) { + return; + } + if (!processedInboundMessages) { + void processPersistedInboundMessages(); + } else if (messagesQueue.length > 0) { void processMessage(); } - }, [messagesQueue, isProcessing, processMessage]); + }, [ + messagesQueue, + isProcessing, + processMessage, + processedInboundMessages, + processPersistedInboundMessages, + ]); React.useEffect(() => { addListener(tunnelbrokerMessageListener); diff --git a/lib/tunnelbroker/use-peer-to-peer-message-handler.js b/lib/tunnelbroker/use-peer-to-peer-message-handler.js --- a/lib/tunnelbroker/use-peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/use-peer-to-peer-message-handler.js @@ -58,7 +58,11 @@ }); // handles `peerToPeerMessageTypes.ENCRYPTED_MESSAGE` -function useHandleOlmMessageToDevice() { +function useHandleOlmMessageToDevice(): ( + decryptedMessageContent: string, + senderInfo: SenderInfo, + messageID: string, +) => Promise { const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); @@ -81,6 +85,8 @@ senderInfo: SenderInfo, messageID: string, ) => { + const { sqliteAPI } = getConfig(); + const parsedMessageToDevice = JSON.parse(decryptedMessageContent); // Handle user-action messages @@ -93,6 +99,7 @@ userActionMessage.type === userActionsP2PMessageTypes.LOG_OUT_PRIMARY_DEVICE ) { + // causes log out, there is no need to remove Inbound P2P message void dispatchActionPromise( logOutActionTypes, primaryDeviceRequestedLogOut(), @@ -110,9 +117,11 @@ await broadcastDeviceListUpdates( allPeerDevices.filter(deviceID => deviceID !== deviceIDToLogOut), ); + await sqliteAPI.removeInboundP2PMessages([messageID]); } else if ( userActionMessage.type === userActionsP2PMessageTypes.DM_OPERATION ) { + // inbound P2P message is removed in DBOpsHandler after processing await processDMOperation(userActionMessage.op, { messageID, senderDeviceID: senderInfo.deviceID, @@ -128,7 +137,7 @@ if (senderInfo.userID === thisUserID) { await reBroadcastAccountDeletion(); // we treat account deletion the same way as primary-device-requested - // logout + // logout, no need to remove Inbound P2P message void dispatchActionPromise( logOutActionTypes, primaryDeviceRequestedLogOut(), @@ -138,6 +147,7 @@ type: removePeerUsersActionType, payload: { userIDs: [senderInfo.userID] }, }); + await sqliteAPI.removeInboundP2PMessages([messageID]); } } else { console.warn( @@ -409,4 +419,4 @@ ); } -export { usePeerToPeerMessageHandler }; +export { usePeerToPeerMessageHandler, useHandleOlmMessageToDevice };