diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js new file mode 100644 --- /dev/null +++ b/lib/tunnelbroker/peer-to-peer-message-handler.js @@ -0,0 +1,111 @@ +// @flow + +import invariant from 'invariant'; +import * as React from 'react'; + +import { useTunnelbroker } from './tunnelbroker-context.js'; +import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; +import { IdentityClientContext } from '../shared/identity-client-context.js'; +import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; +import { + tunnelbrokerMessageTypes, + type TunnelbrokerMessage, +} from '../types/tunnelbroker/messages.js'; +import { + peerToPeerMessageValidator, + type PeerToPeerMessage, +} from '../types/tunnelbroker/peer-to-peer-message-types.js'; + +type Props = { + +socketSend: (message: string) => void, + +getSessionCounter: () => number, + +doesSocketExist: () => boolean, +}; +function PeerToPeerMessageHandler(props: Props): React.Node { + const { socketSend, getSessionCounter, doesSocketExist } = props; + + const { addListener, removeListener } = useTunnelbroker(); + + const identityContext = React.useContext(IdentityClientContext); + invariant(identityContext, 'Identity context should be set'); + const { identityClient } = identityContext; + + const currentlyProcessedMessage = React.useRef>(null); + + const tunnelbrokerMessageListener = React.useCallback( + async (message: TunnelbrokerMessage) => { + if (message.type !== tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) { + return; + } + const confirmation: MessageReceiveConfirmation = { + type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, + messageIDs: [message.messageID], + }; + + let rawPeerToPeerMessage; + try { + rawPeerToPeerMessage = JSON.parse(message.payload); + } catch (e) { + console.log( + 'error while parsing Tunnelbroker peer-to-peer message:', + e.message, + ); + // Client received incorrect message, confirm to remove from + // Tunnelbroker queue. + socketSend(JSON.stringify(confirmation)); + return; + } + + if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { + console.log('invalid Tunnelbroker PeerToPeerMessage'); + // The client received an invalid Tunnelbroker message, + // and cannot process this type of request. + socketSend(JSON.stringify(confirmation)); + return; + } + const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; + currentlyProcessedMessage.current = (async () => { + const localSocketSessionCounter = getSessionCounter(); + await currentlyProcessedMessage.current; + // Since scheduling processing this message socket is closed + // or was closed and reopened, we have to stop processing + // because Tunnelbroker flushes the message again when opening + // the socket, and we want to process this only once + // to maintain order. + if ( + localSocketSessionCounter !== getSessionCounter() || + !doesSocketExist() + ) { + return; + } + try { + await peerToPeerMessageHandler( + peerToPeerMessage, + identityClient, + message.messageID, + ); + } catch (e) { + console.log(e.message); + } finally { + if ( + localSocketSessionCounter === getSessionCounter() && + doesSocketExist() + ) { + // We confirm regardless of success or error while processing. + socketSend(JSON.stringify(confirmation)); + } + } + })(); + }, + [getSessionCounter, identityClient, doesSocketExist, socketSend], + ); + + React.useEffect(() => { + addListener(tunnelbrokerMessageListener); + return () => { + removeListener(tunnelbrokerMessageListener); + }; + }, [addListener, removeListener, tunnelbrokerMessageListener]); +} + +export { PeerToPeerMessageHandler }; diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js --- a/lib/tunnelbroker/tunnelbroker-context.js +++ b/lib/tunnelbroker/tunnelbroker-context.js @@ -6,13 +6,12 @@ import uuid from 'uuid'; import { PeerToPeerProvider } from './peer-to-peer-context.js'; +import { PeerToPeerMessageHandler } from './peer-to-peer-message-handler.js'; import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; -import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; import { IdentityClientContext } from '../shared/identity-client-context.js'; import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; import { isWebPlatform } from '../types/device-types.js'; -import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import type { MessageSentStatus } from '../types/tunnelbroker/message-to-device-request-status-types.js'; import type { MessageToDeviceRequest } from '../types/tunnelbroker/message-to-device-request-types.js'; import { @@ -20,10 +19,6 @@ tunnelbrokerMessageTypes, tunnelbrokerMessageValidator, } from '../types/tunnelbroker/messages.js'; -import { - type PeerToPeerMessage, - peerToPeerMessageValidator, -} from '../types/tunnelbroker/peer-to-peer-message-types.js'; import type { AnonymousInitializationMessage, ConnectionInitializationMessage, @@ -129,7 +124,6 @@ const [connected, setConnected] = React.useState(false); const listeners = React.useRef>(new Set()); const socket = React.useRef(null); - const currentlyProcessedMessage = React.useRef>(null); const socketSessionCounter = React.useRef(0); const promises = React.useRef({}); const heartbeatTimeoutID = React.useRef(); @@ -237,6 +231,7 @@ listener(message); } + // MESSAGE_TO_DEVICE is handled in PeerToPeerMessageHandler if ( message.type === tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE @@ -258,68 +253,6 @@ message.status.data, ); } - } else if ( - message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE - ) { - const confirmation: MessageReceiveConfirmation = { - type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, - messageIDs: [message.messageID], - }; - - let rawPeerToPeerMessage; - try { - rawPeerToPeerMessage = JSON.parse(message.payload); - } catch (e) { - console.log( - 'error while parsing Tunnelbroker peer-to-peer message:', - e.message, - ); - // Client received incorrect message, confirm to remove from - // Tunnelbroker queue. - socket.current?.send(JSON.stringify(confirmation)); - return; - } - - if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { - console.log('invalid Tunnelbroker PeerToPeerMessage'); - // The client received an invalid Tunnelbroker message, - // and cannot process this type of request. - socket.current?.send(JSON.stringify(confirmation)); - return; - } - const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; - currentlyProcessedMessage.current = (async () => { - const localSocketSessionCounter = socketSessionCounter.current; - await currentlyProcessedMessage.current; - // Since scheduling processing this message socket is closed - // or was closed and reopened, we have to stop processing - // because Tunnelbroker flushes the message again when opening - // the socket, and we want to process this only once - // to maintain order. - if ( - localSocketSessionCounter !== socketSessionCounter.current || - !socket.current - ) { - return; - } - try { - await peerToPeerMessageHandler( - peerToPeerMessage, - identityClient, - message.messageID, - ); - } catch (e) { - console.log(e.message); - } finally { - if ( - localSocketSessionCounter === socketSessionCounter.current && - socket.current - ) { - // We confirm regardless of success or error while processing. - socket.current.send(JSON.stringify(confirmation)); - } - } - })(); } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS @@ -459,6 +392,17 @@ [], ); + const getSessionCounter = React.useCallback( + () => socketSessionCounter.current, + [], + ); + + const doesSocketExist = React.useCallback(() => !!socket.current, []); + + const socketSend = React.useCallback((message: string) => { + socket.current?.send(message); + }, []); + const value: TunnelbrokerContextType = React.useMemo( () => ({ sendMessage, @@ -473,6 +417,11 @@ return ( + {children} );