diff --git a/keyserver/src/socket/tunnelbroker-socket.js b/keyserver/src/socket/tunnelbroker-socket.js index 21051ceed..ac7b1d46b 100644 --- a/keyserver/src/socket/tunnelbroker-socket.js +++ b/keyserver/src/socket/tunnelbroker-socket.js @@ -1,192 +1,192 @@ // @flow import uuid from 'uuid'; import WebSocket from 'ws'; import { tunnelbrokerHeartbeatTimeout } from 'lib/shared/timeouts.js'; import type { ClientMessageToDevice } from 'lib/tunnelbroker/tunnelbroker-context.js'; -import type { Heartbeat } from 'lib/types/tunnelbroker/heartbeat-types.js'; import type { MessageReceiveConfirmation } from 'lib/types/tunnelbroker/message-receive-confirmation-types.js'; import type { MessageSentStatus } from 'lib/types/tunnelbroker/message-to-device-request-status-types.js'; import type { MessageToDeviceRequest } from 'lib/types/tunnelbroker/message-to-device-request-types.js'; import { type TunnelbrokerMessage, tunnelbrokerMessageTypes, tunnelbrokerMessageValidator, } from 'lib/types/tunnelbroker/messages.js'; import { type RefreshKeyRequest, refreshKeysRequestValidator, } from 'lib/types/tunnelbroker/peer-to-peer-message-types.js'; import type { ConnectionInitializationMessage } from 'lib/types/tunnelbroker/session-types.js'; +import type { Heartbeat } from 'lib/types/websocket/heartbeat-types.js'; import { uploadNewOneTimeKeys } from '../utils/olm-utils.js'; type PromiseCallbacks = { +resolve: () => void, +reject: (error: string) => void, }; type Promises = { [clientMessageID: string]: PromiseCallbacks }; class TunnelbrokerSocket { ws: WebSocket; connected: boolean; promises: Promises; heartbeatTimeoutID: ?TimeoutID; constructor(socketURL: string, initMessage: ConnectionInitializationMessage) { this.connected = false; this.promises = {}; const socket = new WebSocket(socketURL); socket.on('open', () => { socket.send(JSON.stringify(initMessage)); }); socket.on('close', async () => { this.stopHeartbeatTimeout(); this.connected = false; console.error('Connection to Tunnelbroker closed'); }); socket.on('error', (error: Error) => { console.error('Tunnelbroker socket error:', error.message); }); socket.on('message', this.onMessage); this.ws = socket; } onMessage: (event: ArrayBuffer) => Promise = async ( event: ArrayBuffer, ) => { let rawMessage; try { rawMessage = JSON.parse(event.toString()); } catch (e) { console.error('error while parsing Tunnelbroker message:', e.message); return; } if (!tunnelbrokerMessageValidator.is(rawMessage)) { console.error('invalid TunnelbrokerMessage: ', rawMessage.toString()); return; } const message: TunnelbrokerMessage = rawMessage; this.resetHeartbeatTimeout(); if ( message.type === tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE ) { if (message.status.type === 'Success' && !this.connected) { this.connected = true; console.info('session with Tunnelbroker created'); } else if (message.status.type === 'Success' && this.connected) { console.info( 'received ConnectionInitializationResponse with status: Success for already connected socket', ); } else { this.connected = false; console.error( 'creating session with Tunnelbroker error:', message.status.data, ); } } else if (message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) { const confirmation: MessageReceiveConfirmation = { type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, messageIDs: [message.messageID], }; this.ws.send(JSON.stringify(confirmation)); const { payload } = message; try { const messageToKeyserver = JSON.parse(payload); if (refreshKeysRequestValidator.is(messageToKeyserver)) { const request: RefreshKeyRequest = messageToKeyserver; await uploadNewOneTimeKeys(request.numberOfKeys); } } catch (e) { console.error( 'error while processing message to keyserver:', e.message, ); } } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS ) { for (const status: MessageSentStatus of message.clientMessageIDs) { if (status.type === 'Success') { if (this.promises[status.data]) { this.promises[status.data].resolve(); delete this.promises[status.data]; } else { console.log( 'received successful response for a non-existent request', ); } } else if (status.type === 'Error') { if (this.promises[status.data.id]) { this.promises[status.data.id].reject(status.data.error); delete this.promises[status.data.id]; } else { console.log('received error response for a non-existent request'); } } else if (status.type === 'SerializationError') { console.error('SerializationError for message: ', status.data); } else if (status.type === 'InvalidRequest') { console.log('Tunnelbroker recorded InvalidRequest'); } } } else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { const heartbeat: Heartbeat = { type: tunnelbrokerMessageTypes.HEARTBEAT, }; this.ws.send(JSON.stringify(heartbeat)); } }; sendMessage: (message: ClientMessageToDevice) => Promise = ( message: ClientMessageToDevice, ) => { if (!this.connected) { throw new Error('Tunnelbroker not connected'); } const clientMessageID = uuid.v4(); const messageToDevice: MessageToDeviceRequest = { type: tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST, clientMessageID, deviceID: message.deviceID, payload: message.payload, }; return new Promise((resolve, reject) => { this.promises[clientMessageID] = { resolve, reject, }; this.ws.send(JSON.stringify(messageToDevice)); }); }; stopHeartbeatTimeout() { if (this.heartbeatTimeoutID) { clearTimeout(this.heartbeatTimeoutID); this.heartbeatTimeoutID = null; } } resetHeartbeatTimeout() { this.stopHeartbeatTimeout(); this.heartbeatTimeoutID = setTimeout(() => { this.ws.close(); this.connected = false; }, tunnelbrokerHeartbeatTimeout); } } export default TunnelbrokerSocket; diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js index 3b047aa9f..de45fcb53 100644 --- a/lib/tunnelbroker/tunnelbroker-context.js +++ b/lib/tunnelbroker/tunnelbroker-context.js @@ -1,264 +1,264 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; -import type { Heartbeat } from '../types/tunnelbroker/heartbeat-types.js'; import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import type { MessageSentStatus } from '../types/tunnelbroker/message-to-device-request-status-types.js'; import type { MessageToDeviceRequest } from '../types/tunnelbroker/message-to-device-request-types.js'; import { type TunnelbrokerMessage, tunnelbrokerMessageTypes, tunnelbrokerMessageValidator, } from '../types/tunnelbroker/messages.js'; import { type PeerToPeerMessage, peerToPeerMessageValidator, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import type { ConnectionInitializationMessage } from '../types/tunnelbroker/session-types.js'; +import type { Heartbeat } from '../types/websocket/heartbeat-types.js'; export type ClientMessageToDevice = { +deviceID: string, +payload: string, }; export type TunnelbrokerSocketListener = ( message: TunnelbrokerMessage, ) => mixed; type PromiseCallbacks = { +resolve: () => void, +reject: (error: string) => void, }; type Promises = { [clientMessageID: string]: PromiseCallbacks }; type TunnelbrokerContextType = { +sendMessage: (message: ClientMessageToDevice) => Promise, +addListener: (listener: TunnelbrokerSocketListener) => void, +removeListener: (listener: TunnelbrokerSocketListener) => void, +connected: boolean, }; const TunnelbrokerContext: React.Context = React.createContext(); type Props = { +children: React.Node, +initMessage: ?ConnectionInitializationMessage, +peerToPeerMessageHandler?: (message: PeerToPeerMessage) => mixed, }; function TunnelbrokerProvider(props: Props): React.Node { const { children, initMessage, peerToPeerMessageHandler } = props; const [connected, setConnected] = React.useState(false); const listeners = React.useRef>(new Set()); const socket = React.useRef(null); const promises = React.useRef({}); const heartbeatTimeoutID = React.useRef(); const stopHeartbeatTimeout = React.useCallback(() => { if (heartbeatTimeoutID.current) { clearTimeout(heartbeatTimeoutID.current); heartbeatTimeoutID.current = null; } }, []); const resetHeartbeatTimeout = React.useCallback(() => { stopHeartbeatTimeout(); heartbeatTimeoutID.current = setTimeout(() => { socket.current?.close(); setConnected(false); }, tunnelbrokerHeartbeatTimeout); }, [stopHeartbeatTimeout]); React.useEffect(() => { if (connected || !initMessage) { return; } const tunnelbrokerSocket = new WebSocket(tunnnelbrokerURL); tunnelbrokerSocket.onopen = () => { tunnelbrokerSocket.send(JSON.stringify(initMessage)); }; tunnelbrokerSocket.onclose = () => { setConnected(false); console.log('Connection to Tunnelbroker closed'); }; tunnelbrokerSocket.onerror = e => { console.log('Tunnelbroker socket error:', e.message); }; tunnelbrokerSocket.onmessage = (event: MessageEvent) => { if (typeof event.data !== 'string') { console.log('socket received a non-string message'); return; } let rawMessage; try { rawMessage = JSON.parse(event.data); } catch (e) { console.log('error while parsing Tunnelbroker message:', e.message); return; } if (!tunnelbrokerMessageValidator.is(rawMessage)) { console.log('invalid TunnelbrokerMessage'); return; } const message: TunnelbrokerMessage = rawMessage; resetHeartbeatTimeout(); for (const listener of listeners.current) { listener(message); } if ( message.type === tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE ) { if (message.status.type === 'Success' && !connected) { setConnected(true); console.log('session with Tunnelbroker created'); } else if (message.status.type === 'Success' && connected) { console.log( 'received ConnectionInitializationResponse with status: Success for already connected socket', ); } else { setConnected(false); console.log( 'creating session with Tunnelbroker error:', message.status.data, ); } } else if (message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) { const confirmation: MessageReceiveConfirmation = { type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, messageIDs: [message.messageID], }; socket.current?.send(JSON.stringify(confirmation)); if (!peerToPeerMessageHandler) { return; } let rawPeerToPeerMessage; try { rawPeerToPeerMessage = JSON.parse(message.payload); } catch (e) { console.log( 'error while parsing Tunnelbroker peer-to-peer message:', e.message, ); return; } if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { console.log('invalid Tunnelbroker PeerToPeerMessage'); return; } const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; peerToPeerMessageHandler(peerToPeerMessage); } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS ) { for (const status: MessageSentStatus of message.clientMessageIDs) { if (status.type === 'Success') { promises.current[status.data]?.resolve(); delete promises.current[status.data]; } else if (status.type === 'Error') { promises.current[status.data.id]?.reject(status.data.error); delete promises.current[status.data.id]; } else if (status.type === 'SerializationError') { console.log('SerializationError for message: ', status.data); } else if (status.type === 'InvalidRequest') { console.log('Tunnelbroker recorded InvalidRequest'); } } } else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { const heartbeat: Heartbeat = { type: tunnelbrokerMessageTypes.HEARTBEAT, }; socket.current?.send(JSON.stringify(heartbeat)); } }; socket.current = tunnelbrokerSocket; }, [ connected, initMessage, resetHeartbeatTimeout, stopHeartbeatTimeout, peerToPeerMessageHandler, ]); const sendMessage: (message: ClientMessageToDevice) => Promise = React.useCallback( (message: ClientMessageToDevice) => { if (!connected || !socket.current) { throw new Error('Tunnelbroker not connected'); } const clientMessageID = uuid.v4(); const messageToDevice: MessageToDeviceRequest = { type: tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST, clientMessageID, deviceID: message.deviceID, payload: message.payload, }; return new Promise((resolve, reject) => { promises.current[clientMessageID] = { resolve, reject, }; socket.current?.send(JSON.stringify(messageToDevice)); }); }, [connected], ); const addListener = React.useCallback( (listener: TunnelbrokerSocketListener) => { listeners.current.add(listener); }, [], ); const removeListener = React.useCallback( (listener: TunnelbrokerSocketListener) => { listeners.current.delete(listener); }, [], ); const value: TunnelbrokerContextType = React.useMemo( () => ({ sendMessage, connected, addListener, removeListener, }), [addListener, connected, removeListener, sendMessage], ); return ( {children} ); } function useTunnelbroker(): TunnelbrokerContextType { const context = React.useContext(TunnelbrokerContext); invariant(context, 'TunnelbrokerContext not found'); return context; } export { TunnelbrokerProvider, useTunnelbroker }; diff --git a/lib/types/tunnelbroker/messages.js b/lib/types/tunnelbroker/messages.js index f76208d40..b2cb1f39b 100644 --- a/lib/types/tunnelbroker/messages.js +++ b/lib/types/tunnelbroker/messages.js @@ -1,72 +1,75 @@ // @flow import type { TUnion } from 'tcomb'; import t from 'tcomb'; -import { - type ConnectionInitializationResponse, - connectionInitializationResponseValidator, -} from './connection-initialization-response-types.js'; -import { type Heartbeat, heartbeatValidator } from './heartbeat-types.js'; import { type MessageReceiveConfirmation, messageReceiveConfirmationValidator, } from './message-receive-confirmation-types.js'; import { type MessageToDeviceRequestStatus, messageToDeviceRequestStatusValidator, } from './message-to-device-request-status-types.js'; import { type MessageToDeviceRequest, messageToDeviceRequestValidator, } from './message-to-device-request-types.js'; import { type MessageToDevice, messageToDeviceValidator, } from './message-to-device-types.js'; import { type ConnectionInitializationMessage, connectionInitializationMessageValidator, } from './session-types.js'; +import { + type ConnectionInitializationResponse, + connectionInitializationResponseValidator, +} from '../websocket/connection-initialization-response-types.js'; +import { + type Heartbeat, + heartbeatValidator, +} from '../websocket/heartbeat-types.js'; /* * This file defines types and validation for messages exchanged * with the Tunnelbroker. The definitions in this file should remain in sync * with the structures defined in the corresponding * Rust file at `shared/tunnelbroker_messages/src/messages/mod.rs`. * * If you edit the definitions in one file, * please make sure to update the corresponding definitions in the other. * */ export const tunnelbrokerMessageTypes = Object.freeze({ CONNECTION_INITIALIZATION_MESSAGE: 'ConnectionInitializationMessage', CONNECTION_INITIALIZATION_RESPONSE: 'ConnectionInitializationResponse', ANONYMOUS_INITIALIZATION_MESSAGE: 'AnonymousInitializationMessage', MESSAGE_TO_DEVICE_REQUEST_STATUS: 'MessageToDeviceRequestStatus', MESSAGE_TO_DEVICE_REQUEST: 'MessageToDeviceRequest', MESSAGE_TO_DEVICE: 'MessageToDevice', MESSAGE_RECEIVE_CONFIRMATION: 'MessageReceiveConfirmation', HEARTBEAT: 'Heartbeat', }); export const tunnelbrokerMessageValidator: TUnion = t.union([ connectionInitializationMessageValidator, connectionInitializationResponseValidator, messageToDeviceRequestStatusValidator, messageToDeviceRequestValidator, messageToDeviceValidator, messageReceiveConfirmationValidator, heartbeatValidator, ]); export type TunnelbrokerMessage = | ConnectionInitializationMessage | ConnectionInitializationResponse | MessageToDeviceRequestStatus | MessageToDeviceRequest | MessageToDevice | MessageReceiveConfirmation | Heartbeat; diff --git a/lib/types/tunnelbroker/connection-initialization-response-types.js b/lib/types/websocket/connection-initialization-response-types.js similarity index 100% rename from lib/types/tunnelbroker/connection-initialization-response-types.js rename to lib/types/websocket/connection-initialization-response-types.js diff --git a/lib/types/tunnelbroker/heartbeat-types.js b/lib/types/websocket/heartbeat-types.js similarity index 100% rename from lib/types/tunnelbroker/heartbeat-types.js rename to lib/types/websocket/heartbeat-types.js