diff --git a/keyserver/src/socket/tunnelbroker-socket.js b/keyserver/src/socket/tunnelbroker-socket.js index 3bbb13629..21051ceed 100644 --- a/keyserver/src/socket/tunnelbroker-socket.js +++ b/keyserver/src/socket/tunnelbroker-socket.js @@ -1,192 +1,192 @@ // @flow import uuid from 'uuid'; import WebSocket from 'ws'; import { tunnelbrokerHeartbeatTimeout } from 'lib/shared/timeouts.js'; import type { ClientMessageToDevice } from 'lib/tunnelbroker/tunnelbroker-context.js'; import type { Heartbeat } from 'lib/types/tunnelbroker/heartbeat-types.js'; -import { - type RefreshKeyRequest, - refreshKeysRequestValidator, -} from 'lib/types/tunnelbroker/keys-types.js'; import type { MessageReceiveConfirmation } from 'lib/types/tunnelbroker/message-receive-confirmation-types.js'; import type { MessageSentStatus } from 'lib/types/tunnelbroker/message-to-device-request-status-types.js'; import type { MessageToDeviceRequest } from 'lib/types/tunnelbroker/message-to-device-request-types.js'; import { type TunnelbrokerMessage, tunnelbrokerMessageTypes, tunnelbrokerMessageValidator, } from 'lib/types/tunnelbroker/messages.js'; +import { + type RefreshKeyRequest, + refreshKeysRequestValidator, +} from 'lib/types/tunnelbroker/peer-to-peer-message-types.js'; import type { ConnectionInitializationMessage } from 'lib/types/tunnelbroker/session-types.js'; import { uploadNewOneTimeKeys } from '../utils/olm-utils.js'; type PromiseCallbacks = { +resolve: () => void, +reject: (error: string) => void, }; type Promises = { [clientMessageID: string]: PromiseCallbacks }; class TunnelbrokerSocket { ws: WebSocket; connected: boolean; promises: Promises; heartbeatTimeoutID: ?TimeoutID; constructor(socketURL: string, initMessage: ConnectionInitializationMessage) { this.connected = false; this.promises = {}; const socket = new WebSocket(socketURL); socket.on('open', () => { socket.send(JSON.stringify(initMessage)); }); socket.on('close', async () => { this.stopHeartbeatTimeout(); this.connected = false; console.error('Connection to Tunnelbroker closed'); }); socket.on('error', (error: Error) => { console.error('Tunnelbroker socket error:', error.message); }); socket.on('message', this.onMessage); this.ws = socket; } onMessage: (event: ArrayBuffer) => Promise = async ( event: ArrayBuffer, ) => { let rawMessage; try { rawMessage = JSON.parse(event.toString()); } catch (e) { console.error('error while parsing Tunnelbroker message:', e.message); return; } if (!tunnelbrokerMessageValidator.is(rawMessage)) { console.error('invalid TunnelbrokerMessage: ', rawMessage.toString()); return; } const message: TunnelbrokerMessage = rawMessage; this.resetHeartbeatTimeout(); if ( message.type === tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE ) { if (message.status.type === 'Success' && !this.connected) { this.connected = true; console.info('session with Tunnelbroker created'); } else if (message.status.type === 'Success' && this.connected) { console.info( 'received ConnectionInitializationResponse with status: Success for already connected socket', ); } else { this.connected = false; console.error( 'creating session with Tunnelbroker error:', message.status.data, ); } } else if (message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) { const confirmation: MessageReceiveConfirmation = { type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, messageIDs: [message.messageID], }; this.ws.send(JSON.stringify(confirmation)); const { payload } = message; try { const messageToKeyserver = JSON.parse(payload); if (refreshKeysRequestValidator.is(messageToKeyserver)) { const request: RefreshKeyRequest = messageToKeyserver; await uploadNewOneTimeKeys(request.numberOfKeys); } } catch (e) { console.error( 'error while processing message to keyserver:', e.message, ); } } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS ) { for (const status: MessageSentStatus of message.clientMessageIDs) { if (status.type === 'Success') { if (this.promises[status.data]) { this.promises[status.data].resolve(); delete this.promises[status.data]; } else { console.log( 'received successful response for a non-existent request', ); } } else if (status.type === 'Error') { if (this.promises[status.data.id]) { this.promises[status.data.id].reject(status.data.error); delete this.promises[status.data.id]; } else { console.log('received error response for a non-existent request'); } } else if (status.type === 'SerializationError') { console.error('SerializationError for message: ', status.data); } else if (status.type === 'InvalidRequest') { console.log('Tunnelbroker recorded InvalidRequest'); } } } else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { const heartbeat: Heartbeat = { type: tunnelbrokerMessageTypes.HEARTBEAT, }; this.ws.send(JSON.stringify(heartbeat)); } }; sendMessage: (message: ClientMessageToDevice) => Promise = ( message: ClientMessageToDevice, ) => { if (!this.connected) { throw new Error('Tunnelbroker not connected'); } const clientMessageID = uuid.v4(); const messageToDevice: MessageToDeviceRequest = { type: tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST, clientMessageID, deviceID: message.deviceID, payload: message.payload, }; return new Promise((resolve, reject) => { this.promises[clientMessageID] = { resolve, reject, }; this.ws.send(JSON.stringify(messageToDevice)); }); }; stopHeartbeatTimeout() { if (this.heartbeatTimeoutID) { clearTimeout(this.heartbeatTimeoutID); this.heartbeatTimeoutID = null; } } resetHeartbeatTimeout() { this.stopHeartbeatTimeout(); this.heartbeatTimeoutID = setTimeout(() => { this.ws.close(); this.connected = false; }, tunnelbrokerHeartbeatTimeout); } } export default TunnelbrokerSocket; diff --git a/lib/types/tunnelbroker/keys-types.js b/lib/types/tunnelbroker/keys-types.js deleted file mode 100644 index f84c2df62..000000000 --- a/lib/types/tunnelbroker/keys-types.js +++ /dev/null @@ -1,19 +0,0 @@ -// @flow - -import type { TInterface } from 'tcomb'; -import t from 'tcomb'; - -import { tShape, tString } from '../../utils/validation-utils.js'; - -export type RefreshKeyRequest = { - +type: 'RefreshKeyRequest', - +deviceID: string, - +numberOfKeys: number, -}; - -export const refreshKeysRequestValidator: TInterface = - tShape({ - type: tString('RefreshKeyRequest'), - deviceID: t.String, - numberOfKeys: t.Number, - }); diff --git a/lib/types/tunnelbroker/messages.js b/lib/types/tunnelbroker/messages.js index d8a0ae883..1bcfcc340 100644 --- a/lib/types/tunnelbroker/messages.js +++ b/lib/types/tunnelbroker/messages.js @@ -1,78 +1,71 @@ // @flow import type { TUnion } from 'tcomb'; import t from 'tcomb'; import { type ConnectionInitializationResponse, connectionInitializationResponseValidator, } from './connection-initialization-response-types.js'; import { type Heartbeat, heartbeatValidator } from './heartbeat-types.js'; -import { - type RefreshKeyRequest, - refreshKeysRequestValidator, -} from './keys-types.js'; import { type MessageReceiveConfirmation, messageReceiveConfirmationValidator, } from './message-receive-confirmation-types.js'; import { type MessageToDeviceRequestStatus, messageToDeviceRequestStatusValidator, } from './message-to-device-request-status-types.js'; import { type MessageToDeviceRequest, messageToDeviceRequestValidator, } from './message-to-device-request-types.js'; import { type MessageToDevice, messageToDeviceValidator, } from './message-to-device-types.js'; import { type ConnectionInitializationMessage, connectionInitializationMessageValidator, } from './session-types.js'; /* * This file defines types and validation for messages exchanged * with the Tunnelbroker. The definitions in this file should remain in sync * with the structures defined in the corresponding * Rust file at `shared/tunnelbroker_messages/src/messages/mod.rs`. * * If you edit the definitions in one file, * please make sure to update the corresponding definitions in the other. * */ export const tunnelbrokerMessageTypes = Object.freeze({ - REFRESH_KEYS_REQUEST: 'RefreshKeyRequest', CONNECTION_INITIALIZATION_MESSAGE: 'ConnectionInitializationMessage', CONNECTION_INITIALIZATION_RESPONSE: 'ConnectionInitializationResponse', MESSAGE_TO_DEVICE_REQUEST_STATUS: 'MessageToDeviceRequestStatus', MESSAGE_TO_DEVICE_REQUEST: 'MessageToDeviceRequest', MESSAGE_TO_DEVICE: 'MessageToDevice', MESSAGE_RECEIVE_CONFIRMATION: 'MessageReceiveConfirmation', HEARTBEAT: 'Heartbeat', }); export const tunnelbrokerMessageValidator: TUnion = t.union([ - refreshKeysRequestValidator, connectionInitializationMessageValidator, connectionInitializationResponseValidator, messageToDeviceRequestStatusValidator, messageToDeviceRequestValidator, messageToDeviceValidator, messageReceiveConfirmationValidator, heartbeatValidator, ]); export type TunnelbrokerMessage = - | RefreshKeyRequest | ConnectionInitializationMessage | ConnectionInitializationResponse | MessageToDeviceRequestStatus | MessageToDeviceRequest | MessageToDevice | MessageReceiveConfirmation | Heartbeat; diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs index 2f0cfc084..c85c073c2 100644 --- a/shared/tunnelbroker_messages/src/messages/mod.rs +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -1,45 +1,50 @@ //! Messages sent between Tunnelbroker and a device. pub mod connection_initialization_response; pub mod heartbeat; pub mod keys; pub mod message_receive_confirmation; pub mod message_to_device; pub mod message_to_device_request; pub mod message_to_device_request_status; pub mod session; pub use connection_initialization_response::*; pub use heartbeat::*; pub use keys::*; pub use message_receive_confirmation::*; pub use message_to_device::*; pub use message_to_device_request::*; pub use message_to_device_request_status::*; pub use session::*; use serde::{Deserialize, Serialize}; // This file defines types and validation for messages exchanged // with the Tunnelbroker. The definitions in this file should remain in sync // with the structures defined in the corresponding // JavaScript file at `lib/types/tunnelbroker/messages.js`. // If you edit the definitions in one file, // please make sure to update the corresponding definitions in the other. #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] pub enum Messages { - RefreshKeysRequest(RefreshKeyRequest), ConnectionInitializationMessage(ConnectionInitializationMessage), ConnectionInitializationResponse(ConnectionInitializationResponse), // MessageToDeviceRequestStatus must be placed before MessageToDeviceRequest. // This is due to serde's pattern matching behavior where it prioritizes // the first matching pattern it encounters. MessageToDeviceRequestStatus(MessageToDeviceRequestStatus), MessageToDeviceRequest(MessageToDeviceRequest), MessageToDevice(MessageToDevice), MessageReceiveConfirmation(MessageReceiveConfirmation), Heartbeat(Heartbeat), } + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum PeerToPeerMessages { + RefreshKeysRequest(RefreshKeyRequest), +}