diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js --- a/lib/tunnelbroker/peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/peer-to-peer-message-handler.js @@ -26,7 +26,14 @@ const { addListener, removeListener } = useTunnelbroker(); const peerToPeerMessageHandler = usePeerToPeerMessageHandler(); - const currentlyProcessedMessage = React.useRef>(null); + const [messagesQueue, setMessagesQueue] = React.useState< + Array<{ + +peerToPeerMessage: PeerToPeerMessage, + +messageID: string, + +localSocketSessionCounter: number, + }>, + >([]); + const [isProcessing, setProcessing] = React.useState(false); const tunnelbrokerMessageListener = React.useCallback( async (message: TunnelbrokerToDeviceMessage) => { @@ -60,38 +67,74 @@ return; } const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; - currentlyProcessedMessage.current = (async () => { - const localSocketSessionCounter = getSessionCounter(); - await currentlyProcessedMessage.current; - // Since scheduling processing this message socket is closed - // or was closed and reopened, we have to stop processing - // because Tunnelbroker flushes the message again when opening - // the socket, and we want to process this only once - // to maintain order. - if ( - localSocketSessionCounter !== getSessionCounter() || - !doesSocketExist() - ) { - return; - } - try { - await peerToPeerMessageHandler(peerToPeerMessage, message.messageID); - } catch (e) { - console.log(e.message); - } finally { - if ( - localSocketSessionCounter === getSessionCounter() && - doesSocketExist() - ) { - // We confirm regardless of success or error while processing. - socketSend(JSON.stringify(confirmation)); - } - } - })(); + setMessagesQueue(oldQueue => [ + ...oldQueue, + { + peerToPeerMessage, + messageID: message.messageID, + localSocketSessionCounter: getSessionCounter(), + }, + ]); }, - [getSessionCounter, peerToPeerMessageHandler, doesSocketExist, socketSend], + [getSessionCounter, socketSend], ); + const processMessage = React.useCallback(async () => { + if (messagesQueue.length === 0 || isProcessing) { + return; + } + + setProcessing(true); + + const { peerToPeerMessage, messageID, localSocketSessionCounter } = + messagesQueue[0]; + + // Since scheduling processing this message socket is closed + // or was closed and reopened, we have to stop processing + // because Tunnelbroker flushes the message again when opening + // the socket, and we want to process this only once. + if ( + localSocketSessionCounter !== getSessionCounter() || + !doesSocketExist() + ) { + setMessagesQueue(currentQueue => currentQueue.slice(1)); + setProcessing(false); + return; + } + + try { + await peerToPeerMessageHandler(peerToPeerMessage, messageID); + } catch (e) { + console.log(e.message); + } finally { + if ( + localSocketSessionCounter === getSessionCounter() && + doesSocketExist() + ) { + const confirmation: MessageReceiveConfirmation = { + type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, + messageIDs: [messageID], + }; + socketSend(JSON.stringify(confirmation)); + } + setMessagesQueue(currentQueue => currentQueue.slice(1)); + setProcessing(false); + } + }, [ + doesSocketExist, + getSessionCounter, + isProcessing, + messagesQueue, + peerToPeerMessageHandler, + socketSend, + ]); + + React.useEffect(() => { + if (!isProcessing && messagesQueue.length > 0) { + void processMessage(); + } + }, [messagesQueue, isProcessing, processMessage]); + React.useEffect(() => { addListener(tunnelbrokerMessageListener); return () => {