Changeset View
Changeset View
Standalone View
Standalone View
lib/tunnelbroker/tunnelbroker-context.js
// @flow | // @flow | ||||
import invariant from 'invariant'; | import invariant from 'invariant'; | ||||
import _isEqual from 'lodash/fp/isEqual.js'; | |||||
import * as React from 'react'; | import * as React from 'react'; | ||||
import uuid from 'uuid'; | import uuid from 'uuid'; | ||||
import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; | import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; | ||||
import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; | import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; | ||||
import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; | import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; | ||||
import { IdentityClientContext } from '../shared/identity-client-context.js'; | import { IdentityClientContext } from '../shared/identity-client-context.js'; | ||||
import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; | import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; | ||||
import { isWebPlatform } from '../types/device-types.js'; | |||||
import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; | import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; | ||||
import type { MessageSentStatus } from '../types/tunnelbroker/message-to-device-request-status-types.js'; | import type { MessageSentStatus } from '../types/tunnelbroker/message-to-device-request-status-types.js'; | ||||
import type { MessageToDeviceRequest } from '../types/tunnelbroker/message-to-device-request-types.js'; | import type { MessageToDeviceRequest } from '../types/tunnelbroker/message-to-device-request-types.js'; | ||||
import { | import { | ||||
type TunnelbrokerMessage, | type TunnelbrokerMessage, | ||||
tunnelbrokerMessageTypes, | tunnelbrokerMessageTypes, | ||||
tunnelbrokerMessageValidator, | tunnelbrokerMessageValidator, | ||||
} from '../types/tunnelbroker/messages.js'; | } from '../types/tunnelbroker/messages.js'; | ||||
import { | import { | ||||
type PeerToPeerMessage, | type PeerToPeerMessage, | ||||
peerToPeerMessageValidator, | peerToPeerMessageValidator, | ||||
} from '../types/tunnelbroker/peer-to-peer-message-types.js'; | } from '../types/tunnelbroker/peer-to-peer-message-types.js'; | ||||
import type { | import type { | ||||
AnonymousInitializationMessage, | AnonymousInitializationMessage, | ||||
TunnelbrokerInitializationMessage, | |||||
ConnectionInitializationMessage, | ConnectionInitializationMessage, | ||||
TunnelbrokerInitializationMessage, | |||||
TunnelbrokerDeviceTypes, | |||||
} from '../types/tunnelbroker/session-types.js'; | } from '../types/tunnelbroker/session-types.js'; | ||||
import type { Heartbeat } from '../types/websocket/heartbeat-types.js'; | import type { Heartbeat } from '../types/websocket/heartbeat-types.js'; | ||||
import { getConfig } from '../utils/config.js'; | |||||
import { getContentSigningKey } from '../utils/crypto-utils.js'; | |||||
import { useSelector } from '../utils/redux-utils.js'; | |||||
export type ClientMessageToDevice = { | export type ClientMessageToDevice = { | ||||
+deviceID: string, | +deviceID: string, | ||||
+payload: string, | +payload: string, | ||||
}; | }; | ||||
export type TunnelbrokerSocketListener = ( | export type TunnelbrokerSocketListener = ( | ||||
message: TunnelbrokerMessage, | message: TunnelbrokerMessage, | ||||
Show All 16 Lines | |||||
const TunnelbrokerContext: React.Context<?TunnelbrokerContextType> = | const TunnelbrokerContext: React.Context<?TunnelbrokerContextType> = | ||||
React.createContext<?TunnelbrokerContextType>(); | React.createContext<?TunnelbrokerContextType>(); | ||||
type Props = { | type Props = { | ||||
+children: React.Node, | +children: React.Node, | ||||
+shouldBeClosed?: boolean, | +shouldBeClosed?: boolean, | ||||
+onClose?: () => mixed, | +onClose?: () => mixed, | ||||
+initMessage: ?ConnectionInitializationMessage, | |||||
+secondaryTunnelbrokerConnection?: SecondaryTunnelbrokerConnection, | +secondaryTunnelbrokerConnection?: SecondaryTunnelbrokerConnection, | ||||
}; | }; | ||||
function getTunnelbrokerDeviceType(): TunnelbrokerDeviceTypes { | |||||
return isWebPlatform(getConfig().platformDetails.platform) ? 'web' : 'mobile'; | |||||
} | |||||
function createAnonymousInitMessage( | function createAnonymousInitMessage( | ||||
deviceID: string, | deviceID: string, | ||||
): AnonymousInitializationMessage { | ): AnonymousInitializationMessage { | ||||
return ({ | return ({ | ||||
type: 'AnonymousInitializationMessage', | type: 'AnonymousInitializationMessage', | ||||
deviceID, | deviceID, | ||||
deviceType: 'mobile', | deviceType: getTunnelbrokerDeviceType(), | ||||
}: AnonymousInitializationMessage); | }: AnonymousInitializationMessage); | ||||
} | } | ||||
function TunnelbrokerProvider(props: Props): React.Node { | function TunnelbrokerProvider(props: Props): React.Node { | ||||
const { | const { children, shouldBeClosed, onClose, secondaryTunnelbrokerConnection } = | ||||
children, | props; | ||||
shouldBeClosed, | |||||
onClose, | const accessToken = useSelector(state => state.commServicesAccessToken); | ||||
initMessage: initMessageProp, | const userID = useSelector(state => state.currentUserInfo?.id); | ||||
secondaryTunnelbrokerConnection, | |||||
} = props; | |||||
const [connected, setConnected] = React.useState(false); | |||||
const listeners = React.useRef<Set<TunnelbrokerSocketListener>>(new Set()); | |||||
const socket = React.useRef<?WebSocket>(null); | |||||
const promises = React.useRef<Promises>({}); | |||||
const heartbeatTimeoutID = React.useRef<?TimeoutID>(); | |||||
const [unauthorizedDeviceID, setUnauthorizedDeviceID] = | const [unauthorizedDeviceID, setUnauthorizedDeviceID] = | ||||
React.useState<?string>(null); | React.useState<?string>(null); | ||||
const isAuthorized = !unauthorizedDeviceID; | const isAuthorized = !unauthorizedDeviceID; | ||||
const identityContext = React.useContext(IdentityClientContext); | const createInitMessage = React.useCallback(async () => { | ||||
invariant(identityContext, 'Identity context should be set'); | |||||
const { identityClient } = identityContext; | |||||
const initMessage = React.useMemo(() => { | |||||
if (shouldBeClosed) { | if (shouldBeClosed) { | ||||
return null; | return null; | ||||
} | } | ||||
if (!unauthorizedDeviceID) { | |||||
return initMessageProp; | if (unauthorizedDeviceID) { | ||||
} | |||||
return createAnonymousInitMessage(unauthorizedDeviceID); | return createAnonymousInitMessage(unauthorizedDeviceID); | ||||
}, [shouldBeClosed, unauthorizedDeviceID, initMessageProp]); | } | ||||
if (!accessToken || !userID) { | |||||
return null; | |||||
} | |||||
const deviceID = await getContentSigningKey(); | |||||
if (!deviceID) { | |||||
return null; | |||||
} | |||||
return ({ | |||||
type: 'ConnectionInitializationMessage', | |||||
deviceID, | |||||
accessToken, | |||||
userID, | |||||
deviceType: getTunnelbrokerDeviceType(), | |||||
}: ConnectionInitializationMessage); | |||||
}, [accessToken, shouldBeClosed, unauthorizedDeviceID, userID]); | |||||
const previousInitMessage = | const previousInitMessage = | ||||
React.useRef<?TunnelbrokerInitializationMessage>(initMessage); | React.useRef<?TunnelbrokerInitializationMessage>(null); | ||||
const initMessageChanged = initMessage !== previousInitMessage.current; | |||||
previousInitMessage.current = initMessage; | const [connected, setConnected] = React.useState(false); | ||||
const listeners = React.useRef<Set<TunnelbrokerSocketListener>>(new Set()); | |||||
const socket = React.useRef<?WebSocket>(null); | |||||
const promises = React.useRef<Promises>({}); | |||||
const heartbeatTimeoutID = React.useRef<?TimeoutID>(); | |||||
const identityContext = React.useContext(IdentityClientContext); | |||||
invariant(identityContext, 'Identity context should be set'); | |||||
const { identityClient } = identityContext; | |||||
const stopHeartbeatTimeout = React.useCallback(() => { | const stopHeartbeatTimeout = React.useCallback(() => { | ||||
if (heartbeatTimeoutID.current) { | if (heartbeatTimeoutID.current) { | ||||
clearTimeout(heartbeatTimeoutID.current); | clearTimeout(heartbeatTimeoutID.current); | ||||
heartbeatTimeoutID.current = null; | heartbeatTimeoutID.current = null; | ||||
} | } | ||||
}, []); | }, []); | ||||
const resetHeartbeatTimeout = React.useCallback(() => { | const resetHeartbeatTimeout = React.useCallback(() => { | ||||
stopHeartbeatTimeout(); | stopHeartbeatTimeout(); | ||||
heartbeatTimeoutID.current = setTimeout(() => { | heartbeatTimeoutID.current = setTimeout(() => { | ||||
socket.current?.close(); | socket.current?.close(); | ||||
setConnected(false); | setConnected(false); | ||||
}, tunnelbrokerHeartbeatTimeout); | }, tunnelbrokerHeartbeatTimeout); | ||||
}, [stopHeartbeatTimeout]); | }, [stopHeartbeatTimeout]); | ||||
// determine if the socket is active (not closed or closing) | // determine if the socket is active (not closed or closing) | ||||
const isSocketActive = | const isSocketActive = | ||||
socket.current?.readyState === WebSocket.OPEN || | socket.current?.readyState === WebSocket.OPEN || | ||||
socket.current?.readyState === WebSocket.CONNECTING; | socket.current?.readyState === WebSocket.CONNECTING; | ||||
const connectionChangePromise = React.useRef<?Promise<void>>(null); | |||||
// The Tunnelbroker connection can have 4 states: | // The Tunnelbroker connection can have 4 states: | ||||
// - DISCONNECTED: isSocketActive = false, connected = false | // - DISCONNECTED: isSocketActive = false, connected = false | ||||
// Should be in this state when initMessage is null | // Should be in this state when initMessage is null | ||||
// - CONNECTING: isSocketActive = true, connected = false | // - CONNECTING: isSocketActive = true, connected = false | ||||
// This lasts until Tunnelbroker sends ConnectionInitializationResponse | // This lasts until Tunnelbroker sends ConnectionInitializationResponse | ||||
// - CONNECTED: isSocketActive = true, connected = true | // - CONNECTED: isSocketActive = true, connected = true | ||||
// - DISCONNECTING: isSocketActive = false, connected = true | // - DISCONNECTING: isSocketActive = false, connected = true | ||||
// This lasts between socket.close() and socket.onclose() | // This lasts between socket.close() and socket.onclose() | ||||
React.useEffect(() => { | React.useEffect(() => { | ||||
// when initMessage changes, we need to close the socket and open a new one | connectionChangePromise.current = (async () => { | ||||
await connectionChangePromise.current; | |||||
try { | |||||
const initMessage = await createInitMessage(); | |||||
const initMessageChanged = !_isEqual( | |||||
previousInitMessage.current, | |||||
initMessage, | |||||
); | |||||
previousInitMessage.current = initMessage; | |||||
// when initMessage changes, we need to close the socket | |||||
// and open a new one | |||||
if ( | if ( | ||||
(!initMessage || initMessageChanged) && | (!initMessage || initMessageChanged) && | ||||
isSocketActive && | isSocketActive && | ||||
socket.current | socket.current | ||||
) { | ) { | ||||
socket.current?.close(); | socket.current?.close(); | ||||
return; | return; | ||||
} | } | ||||
// when we're already connected (or pending disconnection), | // when we're already connected (or pending disconnection), | ||||
// or there's no init message to start with, we don't need to do anything | // or there's no init message to start with, we don't need | ||||
// to do anything | |||||
if (connected || !initMessage || socket.current) { | if (connected || !initMessage || socket.current) { | ||||
return; | return; | ||||
} | } | ||||
const tunnelbrokerSocket = new WebSocket(tunnnelbrokerURL); | const tunnelbrokerSocket = new WebSocket(tunnnelbrokerURL); | ||||
tunnelbrokerSocket.onopen = () => { | tunnelbrokerSocket.onopen = () => { | ||||
tunnelbrokerSocket.send(JSON.stringify(initMessage)); | tunnelbrokerSocket.send(JSON.stringify(initMessage)); | ||||
}; | }; | ||||
tunnelbrokerSocket.onclose = () => { | tunnelbrokerSocket.onclose = () => { | ||||
// this triggers the effect hook again and reconnect | // this triggers the effect hook again and reconnect | ||||
setConnected(false); | setConnected(false); | ||||
onClose?.(); | onClose?.(); | ||||
socket.current = null; | socket.current = null; | ||||
console.log('Connection to Tunnelbroker closed'); | console.log('Connection to Tunnelbroker closed'); | ||||
}; | }; | ||||
tunnelbrokerSocket.onerror = e => { | tunnelbrokerSocket.onerror = e => { | ||||
console.log('Tunnelbroker socket error:', e.message); | console.log('Tunnelbroker socket error:', e.message); | ||||
}; | }; | ||||
tunnelbrokerSocket.onmessage = (event: MessageEvent) => { | tunnelbrokerSocket.onmessage = (event: MessageEvent) => { | ||||
if (typeof event.data !== 'string') { | if (typeof event.data !== 'string') { | ||||
console.log('socket received a non-string message'); | console.log('socket received a non-string message'); | ||||
return; | return; | ||||
} | } | ||||
let rawMessage; | let rawMessage; | ||||
try { | try { | ||||
rawMessage = JSON.parse(event.data); | rawMessage = JSON.parse(event.data); | ||||
} catch (e) { | } catch (e) { | ||||
console.log('error while parsing Tunnelbroker message:', e.message); | console.log('error while parsing Tunnelbroker message:', e.message); | ||||
return; | return; | ||||
} | } | ||||
if (!tunnelbrokerMessageValidator.is(rawMessage)) { | if (!tunnelbrokerMessageValidator.is(rawMessage)) { | ||||
console.log('invalid TunnelbrokerMessage'); | console.log('invalid TunnelbrokerMessage'); | ||||
return; | return; | ||||
} | } | ||||
const message: TunnelbrokerMessage = rawMessage; | const message: TunnelbrokerMessage = rawMessage; | ||||
resetHeartbeatTimeout(); | resetHeartbeatTimeout(); | ||||
for (const listener of listeners.current) { | for (const listener of listeners.current) { | ||||
listener(message); | listener(message); | ||||
} | } | ||||
if ( | if ( | ||||
message.type === | message.type === | ||||
tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE | tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE | ||||
) { | ) { | ||||
if (message.status.type === 'Success' && !connected) { | if (message.status.type === 'Success' && !connected) { | ||||
setConnected(true); | setConnected(true); | ||||
console.log( | console.log( | ||||
'session with Tunnelbroker created. isAuthorized:', | 'session with Tunnelbroker created. isAuthorized:', | ||||
isAuthorized, | isAuthorized, | ||||
); | ); | ||||
} else if (message.status.type === 'Success' && connected) { | } else if (message.status.type === 'Success' && connected) { | ||||
console.log( | console.log( | ||||
'received ConnectionInitializationResponse with status: Success for already connected socket', | 'received ConnectionInitializationResponse with status: Success for already connected socket', | ||||
); | ); | ||||
} else { | } else { | ||||
setConnected(false); | setConnected(false); | ||||
console.log( | console.log( | ||||
'creating session with Tunnelbroker error:', | 'creating session with Tunnelbroker error:', | ||||
message.status.data, | message.status.data, | ||||
); | ); | ||||
} | } | ||||
} else if (message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) { | } else if ( | ||||
message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE | |||||
) { | |||||
const confirmation: MessageReceiveConfirmation = { | const confirmation: MessageReceiveConfirmation = { | ||||
type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, | type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, | ||||
messageIDs: [message.messageID], | messageIDs: [message.messageID], | ||||
}; | }; | ||||
socket.current?.send(JSON.stringify(confirmation)); | socket.current?.send(JSON.stringify(confirmation)); | ||||
let rawPeerToPeerMessage; | let rawPeerToPeerMessage; | ||||
try { | try { | ||||
rawPeerToPeerMessage = JSON.parse(message.payload); | rawPeerToPeerMessage = JSON.parse(message.payload); | ||||
} catch (e) { | } catch (e) { | ||||
console.log( | console.log( | ||||
'error while parsing Tunnelbroker peer-to-peer message:', | 'error while parsing Tunnelbroker peer-to-peer message:', | ||||
e.message, | e.message, | ||||
); | ); | ||||
return; | return; | ||||
} | } | ||||
if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { | if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { | ||||
console.log('invalid Tunnelbroker PeerToPeerMessage'); | console.log('invalid Tunnelbroker PeerToPeerMessage'); | ||||
return; | return; | ||||
} | } | ||||
const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; | const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; | ||||
void peerToPeerMessageHandler(peerToPeerMessage, identityClient); | void peerToPeerMessageHandler(peerToPeerMessage, identityClient); | ||||
} else if ( | } else if ( | ||||
message.type === | message.type === | ||||
tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS | tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS | ||||
) { | ) { | ||||
for (const status: MessageSentStatus of message.clientMessageIDs) { | for (const status: MessageSentStatus of message.clientMessageIDs) { | ||||
if (status.type === 'Success') { | if (status.type === 'Success') { | ||||
promises.current[status.data]?.resolve(); | promises.current[status.data]?.resolve(); | ||||
delete promises.current[status.data]; | delete promises.current[status.data]; | ||||
} else if (status.type === 'Error') { | } else if (status.type === 'Error') { | ||||
promises.current[status.data.id]?.reject(status.data.error); | promises.current[status.data.id]?.reject(status.data.error); | ||||
delete promises.current[status.data.id]; | delete promises.current[status.data.id]; | ||||
} else if (status.type === 'SerializationError') { | } else if (status.type === 'SerializationError') { | ||||
console.log('SerializationError for message: ', status.data); | console.log('SerializationError for message: ', status.data); | ||||
} else if (status.type === 'InvalidRequest') { | } else if (status.type === 'InvalidRequest') { | ||||
console.log('Tunnelbroker recorded InvalidRequest'); | console.log('Tunnelbroker recorded InvalidRequest'); | ||||
} | } | ||||
} | } | ||||
} else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { | } else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { | ||||
const heartbeat: Heartbeat = { | const heartbeat: Heartbeat = { | ||||
type: tunnelbrokerMessageTypes.HEARTBEAT, | type: tunnelbrokerMessageTypes.HEARTBEAT, | ||||
}; | }; | ||||
socket.current?.send(JSON.stringify(heartbeat)); | socket.current?.send(JSON.stringify(heartbeat)); | ||||
} | } | ||||
}; | }; | ||||
socket.current = tunnelbrokerSocket; | socket.current = tunnelbrokerSocket; | ||||
} catch (err) { | |||||
console.log('Tunnelbroker connection error:', err); | |||||
} | |||||
})(); | |||||
}, [ | }, [ | ||||
connected, | connected, | ||||
initMessage, | |||||
initMessageChanged, | |||||
isSocketActive, | isSocketActive, | ||||
isAuthorized, | isAuthorized, | ||||
resetHeartbeatTimeout, | resetHeartbeatTimeout, | ||||
stopHeartbeatTimeout, | stopHeartbeatTimeout, | ||||
identityClient, | identityClient, | ||||
onClose, | onClose, | ||||
createInitMessage, | |||||
]); | ]); | ||||
const sendMessageToDeviceRequest: ( | const sendMessageToDeviceRequest: ( | ||||
request: MessageToDeviceRequest, | request: MessageToDeviceRequest, | ||||
) => Promise<void> = React.useCallback( | ) => Promise<void> = React.useCallback( | ||||
request => { | request => { | ||||
return new Promise((resolve, reject) => { | return new Promise((resolve, reject) => { | ||||
const socketActive = connected && socket.current; | const socketActive = connected && socket.current; | ||||
▲ Show 20 Lines • Show All 116 Lines • Show Last 20 Lines |