diff --git a/keyserver/src/socket/tunnelbroker.js b/keyserver/src/socket/tunnelbroker.js index 23a3e7a84..56e41e2c4 100644 --- a/keyserver/src/socket/tunnelbroker.js +++ b/keyserver/src/socket/tunnelbroker.js @@ -1,105 +1,105 @@ // @flow import WebSocket from 'ws'; import { refreshKeysTBMessageValidator, type TBKeyserverConnectionInitializationMessage, type MessageFromTunnelbroker, tunnelbrokerMessageTypes, } from 'lib/types/tunnelbroker-messages.js'; import { getCommConfig } from 'lib/utils/comm-config.js'; import { ServerError } from 'lib/utils/errors.js'; import { fetchOlmAccount } from '../updaters/olm-account-updater.js'; import { type IdentityInfo } from '../user/identity.js'; import { uploadNewOneTimeKeys } from '../utils/olm-utils.js'; type TBConnectionInfo = { +url: string, }; async function getTBConnectionInfo(): Promise { const tbConfig = await getCommConfig({ folder: 'facts', name: 'tunnelbroker', }); if (tbConfig) { return tbConfig; } console.warn('Defaulting to local Tunnelbroker instance'); return { url: 'ws://127.0.0.1:51001', }; } async function createAndMaintainTunnelbrokerWebsocket( identityInfo: IdentityInfo, ) { const [accountInfo, tbConnectionInfo] = await Promise.all([ fetchOlmAccount('content'), getTBConnectionInfo(), ]); const deviceID = JSON.parse(accountInfo.account.identity_keys()).ed25519; openTunnelbrokerConnection( deviceID, identityInfo.userId, identityInfo.accessToken, tbConnectionInfo.url, ); } function handleTBMessageEvent(event: ArrayBuffer): Promise { const rawMessage = JSON.parse(event.toString()); if (!refreshKeysTBMessageValidator.is(rawMessage)) { throw new ServerError('unsupported_tunnelbroker_message'); } const message: MessageFromTunnelbroker = rawMessage; if (message.type === tunnelbrokerMessageTypes.REFRESH_KEYS_REQUEST) { return uploadNewOneTimeKeys(message.numberOfKeys); } throw new ServerError('unsupported_tunnelbroker_message'); } function openTunnelbrokerConnection( deviceID: string, userID: string, accessToken: string, tbUrl: string, ) { try { const tunnelbrokerSocket = new WebSocket(tbUrl); tunnelbrokerSocket.on('open', () => { const message: TBKeyserverConnectionInitializationMessage = { type: 'sessionRequest', accessToken, - deviceId: deviceID, + deviceID, deviceType: 'keyserver', - userId: userID, + userID, }; tunnelbrokerSocket.send(JSON.stringify(message)); console.info('Connection to Tunnelbroker established'); }); tunnelbrokerSocket.on('close', async () => { console.warn('Connection to Tunnelbroker closed'); }); tunnelbrokerSocket.on('error', (error: Error) => { console.error('Tunnelbroker socket error', error.message); }); tunnelbrokerSocket.on('message', handleTBMessageEvent); } catch { console.log('Failed to open connection with Tunnelbroker'); } } export { createAndMaintainTunnelbrokerWebsocket }; diff --git a/lib/types/tunnelbroker-messages.js b/lib/types/tunnelbroker-messages.js index 5e871fac2..012fbe180 100644 --- a/lib/types/tunnelbroker-messages.js +++ b/lib/types/tunnelbroker-messages.js @@ -1,55 +1,55 @@ // @flow import t, { type TInterface } from 'tcomb'; import { tShape, tString } from '../utils/validation-utils.js'; type TBSharedConnectionInitializationMessage = { +type: 'sessionRequest', - +deviceId: string, + +deviceID: string, +accessToken: string, +deviceAppVersion?: string, - +userId: string, + +userID: string, }; export type TBKeyserverConnectionInitializationMessage = { ...TBSharedConnectionInitializationMessage, +deviceType: 'keyserver', }; export type TBClientConnectionInitializationMessage = { ...TBSharedConnectionInitializationMessage, +deviceType: 'web' | 'mobile', }; export type TBNotifyClientConnectionInitializationMessage = { ...TBClientConnectionInitializationMessage, +notifyToken: string, +notifyPlatform: 'apns' | 'fcm' | 'web' | 'wns', }; export type MessageToTunnelbroker = | TBKeyserverConnectionInitializationMessage | TBClientConnectionInitializationMessage | TBNotifyClientConnectionInitializationMessage; export const tunnelbrokerMessageTypes = Object.freeze({ REFRESH_KEYS_REQUEST: 'RefreshKeyRequest', }); export type TBRefreshKeysRequest = { +type: 'RefreshKeyRequest', - +deviceId: string, + +deviceID: string, +numberOfKeys: number, }; export const refreshKeysTBMessageValidator: TInterface = tShape({ type: tString('RefreshKeyRequest'), - deviceId: t.String, + deviceID: t.String, numberOfKeys: t.Number, }); // Disjoint enumeration of all messages received from Tunnelbroker // Currently, only a single message export type MessageFromTunnelbroker = TBRefreshKeysRequest; diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs index 95201a2e7..a2192a995 100644 --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_test.rs @@ -1,121 +1,121 @@ use futures_util::SinkExt; use tokio_tungstenite::{connect_async, tungstenite::Message}; mod proto { tonic::include_proto!("tunnelbroker"); } use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use tunnelbroker_messages as messages; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn send_refresh_request() { // Create session as a keyserver let (mut socket, _) = connect_async("ws://localhost:51001") .await .expect("Can't connect"); let session_request = r#"{ "type": "sessionRequest", "accessToken": "xkdeifjsld", - "deviceId": "foo", - "userId": "alice", + "deviceID": "foo", + "userID": "alice", "deviceType": "keyserver" }"#; socket .send(Message::Text(session_request.to_string())) .await .expect("Failed to send message"); // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") .await .unwrap(); let refresh_request = messages::RefreshKeyRequest { device_id: "foo".to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: "foo".to_string(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Have keyserver receive any websocket messages let response = socket.next().await.unwrap().unwrap(); // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } /// Test that a message to an offline device gets pushed to dynamodb /// then recalled once a device connects #[tokio::test] async fn presist_messages() { // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") .await .unwrap(); let refresh_request = messages::RefreshKeyRequest { device_id: "bar".to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: "bar".to_string(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Wait one second to ensure that message had time to persist use std::{thread, time}; let ten_millis = time::Duration::from_millis(50); thread::sleep(ten_millis); // Create session as a keyserver let (mut socket, _) = connect_async("ws://localhost:51001") .await .expect("Can't connect"); let session_request = r#"{ "type": "sessionRequest", "accessToken": "xkdexfjsld", - "deviceId": "bar", + "deviceID": "bar", "deviceType": "keyserver" }"#; socket .send(Message::Text(session_request.to_string())) .await .expect("Failed to send message"); // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); }; } diff --git a/shared/tunnelbroker_messages/src/messages/keys.rs b/shared/tunnelbroker_messages/src/messages/keys.rs index f1e1c891d..acd249fba 100644 --- a/shared/tunnelbroker_messages/src/messages/keys.rs +++ b/shared/tunnelbroker_messages/src/messages/keys.rs @@ -1,28 +1,29 @@ // Messages sent between tunnelbroker and a device use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, PartialEq, Debug)] #[serde(tag = "type", rename_all = "camelCase")] pub struct RefreshKeyRequest { + #[serde(rename = "deviceID")] pub device_id: String, pub number_of_keys: u32, } #[cfg(test)] mod key_tests { use super::*; #[test] fn test_refresh_deserialization() { let example_payload = r#"{ "type": "RefreshKeyRequest", - "deviceId": "adfjEDFS", + "deviceID": "adfjEDFS", "numberOfKeys": 6 }"#; let request = serde_json::from_str::(example_payload).unwrap(); assert_eq!(request.number_of_keys, 6); } } diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs index 12f4ad66b..f7aaf81a6 100644 --- a/shared/tunnelbroker_messages/src/messages/session.rs +++ b/shared/tunnelbroker_messages/src/messages/session.rs @@ -1,71 +1,73 @@ // Messages sent between tunnelbroker and a device use serde::{Deserialize, Serialize}; /// The workflow when estabilishing a tunnelbroker connection: /// - Client sends ConnectionInitializationMessage /// - Tunnelbroker validates access_token with identity service /// - Tunnelbroker emits an AMQP message declaring that it has opened a new /// connection with a given device, so that the respective tunnelbroker /// instance can close the existing connection. /// - Tunnelbroker returns a session_id representing that the connection was /// accepted /// - Tunnelbroker will flush all messages related to device from RabbitMQ. /// This must be done first before flushing DynamoDB to prevent duplicated /// messages. /// - Tunnelbroker flushes all messages in DynamoDB /// - Tunnelbroker orders messages by creation date (oldest first), and sends /// messages to device /// - Tunnelbroker then polls for incoming messages from device #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "camelCase")] pub enum DeviceTypes { Mobile, Web, Keyserver, } /// Message sent by a client to tunnelbroker to initiate a websocket /// session. Tunnelbroker will then validate the access token with identity /// service before continuing with the request. #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub struct ConnectionInitializationMessage { + #[serde(rename = "deviceID")] pub device_id: String, pub access_token: String, + #[serde(rename = "userID")] pub user_id: String, pub notify_token: Option, pub device_type: DeviceTypes, pub device_app_version: Option, pub device_os: Option, } #[derive(Serialize, Deserialize)] pub struct ConnectionInitializationResponse { pub session_id: String, } #[cfg(test)] mod session_tests { use super::*; #[test] fn test_session_deserialization() { let example_payload = r#"{ "type": "sessionRequest", "accessToken": "xkdeifjsld", - "deviceId": "foo", - "userId": "alice", + "deviceID": "foo", + "userID": "alice", "deviceType": "keyserver" }"#; let request = serde_json::from_str::(example_payload) .unwrap(); assert_eq!(request.device_id, "foo"); assert_eq!(request.access_token, "xkdeifjsld"); assert_eq!(request.device_os, None); assert_eq!(request.device_type, DeviceTypes::Keyserver); } }