Page MenuHomePhabricator

D12371.diff
No OneTemporary

D12371.diff

diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js
new file mode 100644
--- /dev/null
+++ b/lib/tunnelbroker/peer-to-peer-message-handler.js
@@ -0,0 +1,111 @@
+// @flow
+
+import invariant from 'invariant';
+import * as React from 'react';
+
+import { useTunnelbroker } from './tunnelbroker-context.js';
+import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js';
+import { IdentityClientContext } from '../shared/identity-client-context.js';
+import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js';
+import {
+ tunnelbrokerMessageTypes,
+ type TunnelbrokerMessage,
+} from '../types/tunnelbroker/messages.js';
+import {
+ peerToPeerMessageValidator,
+ type PeerToPeerMessage,
+} from '../types/tunnelbroker/peer-to-peer-message-types.js';
+
+type Props = {
+ +socketSend: (message: string) => void,
+ +getSessionCounter: () => number,
+ +doesSocketExist: () => boolean,
+};
+function PeerToPeerMessageHandler(props: Props): React.Node {
+ const { socketSend, getSessionCounter, doesSocketExist } = props;
+
+ const { addListener, removeListener } = useTunnelbroker();
+
+ const identityContext = React.useContext(IdentityClientContext);
+ invariant(identityContext, 'Identity context should be set');
+ const { identityClient } = identityContext;
+
+ const currentlyProcessedMessage = React.useRef<?Promise<mixed>>(null);
+
+ const tunnelbrokerMessageListener = React.useCallback(
+ async (message: TunnelbrokerMessage) => {
+ if (message.type !== tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) {
+ return;
+ }
+ const confirmation: MessageReceiveConfirmation = {
+ type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
+ messageIDs: [message.messageID],
+ };
+
+ let rawPeerToPeerMessage;
+ try {
+ rawPeerToPeerMessage = JSON.parse(message.payload);
+ } catch (e) {
+ console.log(
+ 'error while parsing Tunnelbroker peer-to-peer message:',
+ e.message,
+ );
+ // Client received incorrect message, confirm to remove from
+ // Tunnelbroker queue.
+ socketSend(JSON.stringify(confirmation));
+ return;
+ }
+
+ if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) {
+ console.log('invalid Tunnelbroker PeerToPeerMessage');
+ // The client received an invalid Tunnelbroker message,
+ // and cannot process this type of request.
+ socketSend(JSON.stringify(confirmation));
+ return;
+ }
+ const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage;
+ currentlyProcessedMessage.current = (async () => {
+ const localSocketSessionCounter = getSessionCounter();
+ await currentlyProcessedMessage.current;
+ // Since scheduling processing this message socket is closed
+ // or was closed and reopened, we have to stop processing
+ // because Tunnelbroker flushes the message again when opening
+ // the socket, and we want to process this only once
+ // to maintain order.
+ if (
+ localSocketSessionCounter !== getSessionCounter() ||
+ !doesSocketExist()
+ ) {
+ return;
+ }
+ try {
+ await peerToPeerMessageHandler(
+ peerToPeerMessage,
+ identityClient,
+ message.messageID,
+ );
+ } catch (e) {
+ console.log(e.message);
+ } finally {
+ if (
+ localSocketSessionCounter === getSessionCounter() &&
+ doesSocketExist()
+ ) {
+ // We confirm regardless of success or error while processing.
+ socketSend(JSON.stringify(confirmation));
+ }
+ }
+ })();
+ },
+ [getSessionCounter, identityClient, doesSocketExist, socketSend],
+ );
+
+ React.useEffect(() => {
+ addListener(tunnelbrokerMessageListener);
+ return () => {
+ removeListener(tunnelbrokerMessageListener);
+ };
+ }, [addListener, removeListener, tunnelbrokerMessageListener]);
+}
+
+export { PeerToPeerMessageHandler };
diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js
--- a/lib/tunnelbroker/tunnelbroker-context.js
+++ b/lib/tunnelbroker/tunnelbroker-context.js
@@ -6,13 +6,12 @@
import uuid from 'uuid';
import { PeerToPeerProvider } from './peer-to-peer-context.js';
+import { PeerToPeerMessageHandler } from './peer-to-peer-message-handler.js';
import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js';
import { tunnnelbrokerURL } from '../facts/tunnelbroker.js';
-import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js';
import { IdentityClientContext } from '../shared/identity-client-context.js';
import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js';
import { isWebPlatform } from '../types/device-types.js';
-import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js';
import type { MessageSentStatus } from '../types/tunnelbroker/message-to-device-request-status-types.js';
import type { MessageToDeviceRequest } from '../types/tunnelbroker/message-to-device-request-types.js';
import {
@@ -20,10 +19,6 @@
tunnelbrokerMessageTypes,
tunnelbrokerMessageValidator,
} from '../types/tunnelbroker/messages.js';
-import {
- type PeerToPeerMessage,
- peerToPeerMessageValidator,
-} from '../types/tunnelbroker/peer-to-peer-message-types.js';
import type {
AnonymousInitializationMessage,
ConnectionInitializationMessage,
@@ -129,7 +124,6 @@
const [connected, setConnected] = React.useState(false);
const listeners = React.useRef<Set<TunnelbrokerSocketListener>>(new Set());
const socket = React.useRef<?WebSocket>(null);
- const currentlyProcessedMessage = React.useRef<?Promise<mixed>>(null);
const socketSessionCounter = React.useRef(0);
const promises = React.useRef<Promises>({});
const heartbeatTimeoutID = React.useRef<?TimeoutID>();
@@ -237,6 +231,7 @@
listener(message);
}
+ // MESSAGE_TO_DEVICE is handled in PeerToPeerMessageHandler
if (
message.type ===
tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE
@@ -258,68 +253,6 @@
message.status.data,
);
}
- } else if (
- message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE
- ) {
- const confirmation: MessageReceiveConfirmation = {
- type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
- messageIDs: [message.messageID],
- };
-
- let rawPeerToPeerMessage;
- try {
- rawPeerToPeerMessage = JSON.parse(message.payload);
- } catch (e) {
- console.log(
- 'error while parsing Tunnelbroker peer-to-peer message:',
- e.message,
- );
- // Client received incorrect message, confirm to remove from
- // Tunnelbroker queue.
- socket.current?.send(JSON.stringify(confirmation));
- return;
- }
-
- if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) {
- console.log('invalid Tunnelbroker PeerToPeerMessage');
- // The client received an invalid Tunnelbroker message,
- // and cannot process this type of request.
- socket.current?.send(JSON.stringify(confirmation));
- return;
- }
- const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage;
- currentlyProcessedMessage.current = (async () => {
- const localSocketSessionCounter = socketSessionCounter.current;
- await currentlyProcessedMessage.current;
- // Since scheduling processing this message socket is closed
- // or was closed and reopened, we have to stop processing
- // because Tunnelbroker flushes the message again when opening
- // the socket, and we want to process this only once
- // to maintain order.
- if (
- localSocketSessionCounter !== socketSessionCounter.current ||
- !socket.current
- ) {
- return;
- }
- try {
- await peerToPeerMessageHandler(
- peerToPeerMessage,
- identityClient,
- message.messageID,
- );
- } catch (e) {
- console.log(e.message);
- } finally {
- if (
- localSocketSessionCounter === socketSessionCounter.current &&
- socket.current
- ) {
- // We confirm regardless of success or error while processing.
- socket.current.send(JSON.stringify(confirmation));
- }
- }
- })();
} else if (
message.type ===
tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS
@@ -459,6 +392,17 @@
[],
);
+ const getSessionCounter = React.useCallback(
+ () => socketSessionCounter.current,
+ [],
+ );
+
+ const doesSocketExist = React.useCallback(() => !!socket.current, []);
+
+ const socketSend = React.useCallback((message: string) => {
+ socket.current?.send(message);
+ }, []);
+
const value: TunnelbrokerContextType = React.useMemo(
() => ({
sendMessage,
@@ -473,6 +417,11 @@
return (
<TunnelbrokerContext.Provider value={value}>
+ <PeerToPeerMessageHandler
+ getSessionCounter={getSessionCounter}
+ doesSocketExist={doesSocketExist}
+ socketSend={socketSend}
+ />
<PeerToPeerProvider>{children}</PeerToPeerProvider>
</TunnelbrokerContext.Provider>
);

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 25, 7:37 PM (22 h, 1 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2580819
Default Alt Text
D12371.diff (9 KB)

Event Timeline