diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js --- a/lib/tunnelbroker/peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/peer-to-peer-message-handler.js @@ -8,6 +8,7 @@ usePeerToPeerMessageHandler, } from './use-peer-to-peer-message-handler.js'; import { useLoggedInUserInfo } from '../hooks/account-hooks.js'; +import { useActionsQueue } from '../hooks/actions-queue.js'; import type { InboundP2PMessage } from '../types/sqlite-types.js'; import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import { @@ -33,16 +34,74 @@ const peerToPeerMessageHandler = usePeerToPeerMessageHandler(); const handleOlmMessageToDevice = useHandleOlmMessageToDevice(); - const [messagesQueue, setMessagesQueue] = React.useState< - $ReadOnlyArray<{ - +peerToPeerMessage: PeerToPeerMessage, - +messageID: string, - +localSocketSessionCounter: number, - }>, - >([]); - const [isProcessing, setProcessing] = React.useState(false); - const [processedInboundMessages, setProcessedInboundMessages] = - React.useState(false); + const processItem = React.useCallback( + async ( + item: + | { + +type: 'persisted_message', + +message: InboundP2PMessage, + } + | { + +type: 'received_message', + +message: { + +peerToPeerMessage: PeerToPeerMessage, + +messageID: string, + +localSocketSessionCounter: number, + }, + }, + ) => { + if (item.type === 'persisted_message') { + const { message } = item; + try { + await handleOlmMessageToDevice( + message.plaintext, + { deviceID: message.senderDeviceID, userID: message.senderUserID }, + message.messageID, + ); + } catch (e) { + console.log('Failed processing Olm P2P message:', e); + } + } else { + const { peerToPeerMessage, messageID, localSocketSessionCounter } = + item.message; + // Since scheduling processing this message socket is closed + // or was closed and reopened, we have to stop processing + // because Tunnelbroker flushes the message again when opening + // the socket, and we want to process this only once. + if ( + localSocketSessionCounter !== getSessionCounter() || + !doesSocketExist() + ) { + return; + } + + try { + await peerToPeerMessageHandler(peerToPeerMessage, messageID); + } catch (e) { + console.log(e.message); + } finally { + if ( + localSocketSessionCounter === getSessionCounter() && + doesSocketExist() + ) { + const confirmation: MessageReceiveConfirmation = { + type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, + messageIDs: [messageID], + }; + socketSend(JSON.stringify(confirmation)); + } + } + } + }, + [ + doesSocketExist, + getSessionCounter, + handleOlmMessageToDevice, + peerToPeerMessageHandler, + socketSend, + ], + ); + const { enqueue } = useActionsQueue(processItem); const tunnelbrokerMessageListener = React.useCallback( async (message: TunnelbrokerToDeviceMessage) => { @@ -76,121 +135,60 @@ return; } const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; - setMessagesQueue(oldQueue => [ - ...oldQueue, + enqueue([ { - peerToPeerMessage, - messageID: message.messageID, - localSocketSessionCounter: getSessionCounter(), + type: 'received_message', + message: { + peerToPeerMessage, + messageID: message.messageID, + localSocketSessionCounter: getSessionCounter(), + }, }, ]); }, - [getSessionCounter, socketSend], + [enqueue, getSessionCounter, socketSend], ); - const processPersistedInboundMessages = React.useCallback(async () => { - if (isProcessing || processedInboundMessages) { - return; - } - setProcessing(true); + React.useEffect(() => { + addListener(tunnelbrokerMessageListener); + return () => { + removeListener(tunnelbrokerMessageListener); + }; + }, [addListener, removeListener, tunnelbrokerMessageListener]); + const processPersistedInboundMessages = React.useCallback(async () => { try { const { sqliteAPI } = getConfig(); const messages = await sqliteAPI.getAllInboundP2PMessages(); - - for (const message: InboundP2PMessage of messages) { - try { - await handleOlmMessageToDevice( - message.plaintext, - { deviceID: message.senderDeviceID, userID: message.senderUserID }, - message.messageID, - ); - } catch (e) { - console.log('Failed processing Olm P2P message:', e); - } - } - } finally { - setProcessedInboundMessages(true); - setProcessing(false); - } - }, [handleOlmMessageToDevice, isProcessing, processedInboundMessages]); - - const processMessage = React.useCallback(async () => { - if (messagesQueue.length === 0 || isProcessing) { - return; - } - - setProcessing(true); - - const { peerToPeerMessage, messageID, localSocketSessionCounter } = - messagesQueue[0]; - - // Since scheduling processing this message socket is closed - // or was closed and reopened, we have to stop processing - // because Tunnelbroker flushes the message again when opening - // the socket, and we want to process this only once. - if ( - localSocketSessionCounter !== getSessionCounter() || - !doesSocketExist() - ) { - setMessagesQueue(currentQueue => currentQueue.slice(1)); - setProcessing(false); - return; - } - - try { - await peerToPeerMessageHandler(peerToPeerMessage, messageID); + enqueue( + messages.map(message => ({ + type: 'persisted_message', + message, + })), + ); } catch (e) { - console.log(e.message); - } finally { - if ( - localSocketSessionCounter === getSessionCounter() && - doesSocketExist() - ) { - const confirmation: MessageReceiveConfirmation = { - type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, - messageIDs: [messageID], - }; - socketSend(JSON.stringify(confirmation)); - } - setMessagesQueue(currentQueue => currentQueue.slice(1)); - setProcessing(false); + console.log('error while reading persisted inbound messages:', e.message); } - }, [ - doesSocketExist, - getSessionCounter, - isProcessing, - messagesQueue, - peerToPeerMessageHandler, - socketSend, - ]); + }, [enqueue]); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; + + const processingInputMessagesStarted = React.useRef(false); React.useEffect(() => { - if (isProcessing || !viewerID) { + if (!viewerID || processingInputMessagesStarted.current) { return; } - if (!processedInboundMessages) { - void processPersistedInboundMessages(); - } else if (messagesQueue.length > 0) { - void processMessage(); - } + processingInputMessagesStarted.current = true; + void processPersistedInboundMessages(); }, [ - messagesQueue, - isProcessing, - processMessage, - processedInboundMessages, + addListener, + enqueue, processPersistedInboundMessages, + removeListener, + tunnelbrokerMessageListener, viewerID, ]); - - React.useEffect(() => { - addListener(tunnelbrokerMessageListener); - return () => { - removeListener(tunnelbrokerMessageListener); - }; - }, [addListener, removeListener, tunnelbrokerMessageListener]); } export { PeerToPeerMessageHandler };