diff --git a/keyserver/src/socket/tunnelbroker-socket.js b/keyserver/src/socket/tunnelbroker-socket.js index b40503557..1ba9c377a 100644 --- a/keyserver/src/socket/tunnelbroker-socket.js +++ b/keyserver/src/socket/tunnelbroker-socket.js @@ -1,156 +1,182 @@ // @flow import uuid from 'uuid'; import WebSocket from 'ws'; +import { tunnelbrokerHeartbeatTimeout } from 'lib/shared/timeouts.js'; import type { ClientMessageToDevice } from 'lib/tunnelbroker/tunnelbroker-context.js'; +import type { Heartbeat } from 'lib/types/tunnelbroker/heartbeat-types.js'; import { type RefreshKeyRequest, refreshKeysRequestValidator, } from 'lib/types/tunnelbroker/keys-types.js'; import type { MessageReceiveConfirmation } from 'lib/types/tunnelbroker/message-receive-confirmation-types.js'; import type { MessageSentStatus } from 'lib/types/tunnelbroker/message-to-device-request-status-types.js'; import type { MessageToDeviceRequest } from 'lib/types/tunnelbroker/message-to-device-request-types.js'; import { type TunnelbrokerMessage, tunnelbrokerMessageTypes, tunnelbrokerMessageValidator, } from 'lib/types/tunnelbroker/messages.js'; import type { ConnectionInitializationMessage } from 'lib/types/tunnelbroker/session-types.js'; import { uploadNewOneTimeKeys } from '../utils/olm-utils.js'; type PromiseCallbacks = { +resolve: () => void, +reject: (error: string) => void, }; type Promises = { [clientMessageID: string]: PromiseCallbacks }; class TunnelbrokerSocket { ws: WebSocket; connected: boolean; promises: Promises; + heartbeatTimeoutID: ?TimeoutID; constructor(socketURL: string, initMessage: ConnectionInitializationMessage) { this.connected = false; this.promises = {}; const socket = new WebSocket(socketURL); socket.on('open', () => { socket.send(JSON.stringify(initMessage)); }); socket.on('close', async () => { + this.stopHeartbeatTimeout(); this.connected = false; console.error('Connection to Tunnelbroker closed'); }); socket.on('error', (error: Error) => { console.error('Tunnelbroker socket error:', error.message); }); socket.on('message', this.onMessage); this.ws = socket; } onMessage: (event: ArrayBuffer) => Promise = async ( event: ArrayBuffer, ) => { let rawMessage; try { rawMessage = JSON.parse(event.toString()); } catch (e) { console.error('error while parsing Tunnelbroker message:', e.message); return; } if (!tunnelbrokerMessageValidator.is(rawMessage)) { console.error('invalid TunnelbrokerMessage: ', rawMessage.toString()); return; } const message: TunnelbrokerMessage = rawMessage; + this.resetHeartbeatTimeout(); + if ( message.type === tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE ) { if (message.status.type === 'Success' && !this.connected) { this.connected = true; console.info('session with Tunnelbroker created'); } else if (message.status.type === 'Success' && this.connected) { console.info( 'received ConnectionInitializationResponse with status: Success for already connected socket', ); } else { this.connected = false; console.error( 'creating session with Tunnelbroker error:', message.status.data, ); } } else if (message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE) { const confirmation: MessageReceiveConfirmation = { type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, messageIDs: [message.messageID], }; this.ws.send(JSON.stringify(confirmation)); const { payload } = message; try { const messageToKeyserver = JSON.parse(payload); if (refreshKeysRequestValidator.is(messageToKeyserver)) { const request: RefreshKeyRequest = messageToKeyserver; await uploadNewOneTimeKeys(request.numberOfKeys); } } catch (e) { console.error( 'error while processing message to keyserver:', e.message, ); } } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS ) { for (const status: MessageSentStatus of message.clientMessageIDs) { if (status.type === 'Success') { this.promises[status.data]?.resolve(); delete this.promises[status.data]; } else if (status.type === 'Error') { this.promises[status.data.id]?.reject(status.data.error); delete this.promises[status.data.id]; } else if (status.type === 'SerializationError') { console.error('SerializationError for message: ', status.data); } else if (status.type === 'InvalidRequest') { console.log('Tunnelbroker recorded InvalidRequest'); } } + } else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { + const heartbeat: Heartbeat = { + type: tunnelbrokerMessageTypes.HEARTBEAT, + }; + this.ws.send(JSON.stringify(heartbeat)); } }; sendMessage: (message: ClientMessageToDevice) => Promise = ( message: ClientMessageToDevice, ) => { if (!this.connected) { throw new Error('Tunnelbroker not connected'); } const clientMessageID = uuid.v4(); const messageToDevice: MessageToDeviceRequest = { type: tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST, clientMessageID, deviceID: message.deviceID, payload: message.payload, }; return new Promise((resolve, reject) => { this.promises[clientMessageID] = { resolve, reject, }; this.ws.send(JSON.stringify(messageToDevice)); }); }; + + stopHeartbeatTimeout() { + if (this.heartbeatTimeoutID) { + clearTimeout(this.heartbeatTimeoutID); + this.heartbeatTimeoutID = null; + } + } + + resetHeartbeatTimeout() { + this.stopHeartbeatTimeout(); + this.heartbeatTimeoutID = setTimeout(() => { + this.ws.close(); + this.connected = false; + }, tunnelbrokerHeartbeatTimeout); + } } export default TunnelbrokerSocket; diff --git a/lib/shared/timeouts.js b/lib/shared/timeouts.js index 6a5f0f295..e9fb03f7e 100644 --- a/lib/shared/timeouts.js +++ b/lib/shared/timeouts.js @@ -1,34 +1,38 @@ // @flow // Sometimes the connection can just go "away", without the client knowing it. // To detect this happening, the client hits the server with a "ping" at // interval specified below when there hasn't been any other communication. export const pingFrequency = 3000; // in milliseconds // Time for request to get response after which we consider our connection // questionable. We won't close and reopen the socket yet, but we will visually // indicate to the user that their connection doesn't seem to be working. export const clientRequestVisualTimeout = 5000; // in milliseconds // Time for request to get response after which we assume our socket // is dead. It's possible this happened because of some weird shit on // the server side, so we try to close and reopen the socket in the // hopes that it will fix the problem. Of course, this is rather // unlikely, as the connectivity issue is likely on the client side. export const clientRequestSocketTimeout = 10000; // in milliseconds // Time after which CallServerEndpoint will timeout a request. When using // sockets this is preempted by the above timeout, so it really only applies // for HTTP requests. export const callServerEndpointTimeout = 10000; // in milliseconds // The server expects to get a request at least every three // seconds from the client. If server doesn't get a request // after time window specified below, it will close connection export const serverRequestSocketTimeout = 120000; // in milliseconds // Time server allows itself to respond to client message. If it // takes it longer to respond, it will timeout and send an error // response. This is better than letting the request timeout on the // client, since the client will assume network issues and close the socket. export const serverResponseTimeout = 5000; // in milliseconds + +// Time after which the client consider the Tunnelbroker connection +// as unhealthy and chooses to close the socket. +export const tunnelbrokerHeartbeatTimeout = 9000; // in milliseconds