diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js index f84792d69..4fb7c323e 100644 --- a/lib/tunnelbroker/peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/peer-to-peer-message-handler.js @@ -1,103 +1,146 @@ // @flow import * as React from 'react'; import { useTunnelbroker } from './tunnelbroker-context.js'; import { usePeerToPeerMessageHandler } from './use-peer-to-peer-message-handler.js'; import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import { deviceToTunnelbrokerMessageTypes, type TunnelbrokerToDeviceMessage, tunnelbrokerToDeviceMessageTypes, } from '../types/tunnelbroker/messages.js'; import { peerToPeerMessageValidator, type PeerToPeerMessage, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; type Props = { +socketSend: (message: string) => void, +getSessionCounter: () => number, +doesSocketExist: () => boolean, }; function PeerToPeerMessageHandler(props: Props): React.Node { const { socketSend, getSessionCounter, doesSocketExist } = props; const { addListener, removeListener } = useTunnelbroker(); const peerToPeerMessageHandler = usePeerToPeerMessageHandler(); - const currentlyProcessedMessage = React.useRef>(null); + const [messagesQueue, setMessagesQueue] = React.useState< + $ReadOnlyArray<{ + +peerToPeerMessage: PeerToPeerMessage, + +messageID: string, + +localSocketSessionCounter: number, + }>, + >([]); + const [isProcessing, setProcessing] = React.useState(false); const tunnelbrokerMessageListener = React.useCallback( async (message: TunnelbrokerToDeviceMessage) => { if (message.type !== tunnelbrokerToDeviceMessageTypes.MESSAGE_TO_DEVICE) { return; } const confirmation: MessageReceiveConfirmation = { type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, messageIDs: [message.messageID], }; let rawPeerToPeerMessage; try { rawPeerToPeerMessage = JSON.parse(message.payload); } catch (e) { console.log( 'error while parsing Tunnelbroker peer-to-peer message:', e.message, ); // Client received incorrect message, confirm to remove from // Tunnelbroker queue. socketSend(JSON.stringify(confirmation)); return; } if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { console.log('invalid Tunnelbroker PeerToPeerMessage'); // The client received an invalid Tunnelbroker message, // and cannot process this type of request. socketSend(JSON.stringify(confirmation)); return; } const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; - currentlyProcessedMessage.current = (async () => { - const localSocketSessionCounter = getSessionCounter(); - await currentlyProcessedMessage.current; - // Since scheduling processing this message socket is closed - // or was closed and reopened, we have to stop processing - // because Tunnelbroker flushes the message again when opening - // the socket, and we want to process this only once - // to maintain order. - if ( - localSocketSessionCounter !== getSessionCounter() || - !doesSocketExist() - ) { - return; - } - try { - await peerToPeerMessageHandler(peerToPeerMessage, message.messageID); - } catch (e) { - console.log(e.message); - } finally { - if ( - localSocketSessionCounter === getSessionCounter() && - doesSocketExist() - ) { - // We confirm regardless of success or error while processing. - socketSend(JSON.stringify(confirmation)); - } - } - })(); + setMessagesQueue(oldQueue => [ + ...oldQueue, + { + peerToPeerMessage, + messageID: message.messageID, + localSocketSessionCounter: getSessionCounter(), + }, + ]); }, - [getSessionCounter, peerToPeerMessageHandler, doesSocketExist, socketSend], + [getSessionCounter, socketSend], ); + const processMessage = React.useCallback(async () => { + if (messagesQueue.length === 0 || isProcessing) { + return; + } + + setProcessing(true); + + const { peerToPeerMessage, messageID, localSocketSessionCounter } = + messagesQueue[0]; + + // Since scheduling processing this message socket is closed + // or was closed and reopened, we have to stop processing + // because Tunnelbroker flushes the message again when opening + // the socket, and we want to process this only once. + if ( + localSocketSessionCounter !== getSessionCounter() || + !doesSocketExist() + ) { + setMessagesQueue(currentQueue => currentQueue.slice(1)); + setProcessing(false); + return; + } + + try { + await peerToPeerMessageHandler(peerToPeerMessage, messageID); + } catch (e) { + console.log(e.message); + } finally { + if ( + localSocketSessionCounter === getSessionCounter() && + doesSocketExist() + ) { + const confirmation: MessageReceiveConfirmation = { + type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, + messageIDs: [messageID], + }; + socketSend(JSON.stringify(confirmation)); + } + setMessagesQueue(currentQueue => currentQueue.slice(1)); + setProcessing(false); + } + }, [ + doesSocketExist, + getSessionCounter, + isProcessing, + messagesQueue, + peerToPeerMessageHandler, + socketSend, + ]); + + React.useEffect(() => { + if (!isProcessing && messagesQueue.length > 0) { + void processMessage(); + } + }, [messagesQueue, isProcessing, processMessage]); + React.useEffect(() => { addListener(tunnelbrokerMessageListener); return () => { removeListener(tunnelbrokerMessageListener); }; }, [addListener, removeListener, tunnelbrokerMessageListener]); } export { PeerToPeerMessageHandler };