diff --git a/keyserver/package.json b/keyserver/package.json --- a/keyserver/package.json +++ b/keyserver/package.json @@ -83,7 +83,8 @@ "twin-bcrypt": "^2.1.1", "uuid": "^3.4.0", "web": "0.0.1", - "web-push": "^3.5.0" + "web-push": "^3.5.0", + "ws": "^8.13.0" }, "optionalDependencies": { "bufferutil": "^4.0.5", diff --git a/keyserver/src/keyserver.js b/keyserver/src/keyserver.js --- a/keyserver/src/keyserver.js +++ b/keyserver/src/keyserver.js @@ -14,6 +14,7 @@ import { jsonHandler, downloadHandler, + handleAsyncPromise, htmlHandler, uploadHandler, } from './responders/handlers.js'; @@ -25,6 +26,7 @@ } from './responders/website-responders.js'; import { webWorkerResponder } from './responders/webworker-responders.js'; import { onConnection } from './socket/socket.js'; +import { createAndMaintainTunnelbrokerWebsocket } from './socket/tunnelbroker.js'; import { multerProcessor, multimediaUploadResponder, @@ -62,7 +64,13 @@ // Allow login to be optional until staging environment is available try { - await verifyUserLoggedIn(); + // We await here to ensure that the keyserver has been provisioned a + // commServicesAccessToken. In the future, this will be necessary for + // many keyserver operations. + const identityInfo = await verifyUserLoggedIn(); + // normal keyserver behavior yet. In addition, this doesn't return + // information useful for other keyserver functions. + handleAsyncPromise(createAndMaintainTunnelbrokerWebsocket(identityInfo)); } catch (e) { console.warn('failed_identity_login'); } diff --git a/keyserver/src/socket/tunnelbroker.js b/keyserver/src/socket/tunnelbroker.js new file mode 100644 --- /dev/null +++ b/keyserver/src/socket/tunnelbroker.js @@ -0,0 +1,60 @@ +// @flow + +import WebSocket from 'ws'; + +import { type TBKeyserverConnectionInitializationMessage } from 'lib/types/tunnelbroker-messages.js'; +import sleep from 'lib/utils/sleep.js'; + +import { fetchOlmAccount } from '../updaters/olm-account-updater.js'; +import type { IdentityInfo } from '../user/identity.js'; + +async function createAndMaintainTunnelbrokerWebsocket( + identityInfo: IdentityInfo, +) { + const accountInfo = await fetchOlmAccount('content'); + const deviceID = JSON.parse(accountInfo.account.identity_keys()).curve25519; + + openTunnelbrokerConnection( + deviceID, + identityInfo.userId, + identityInfo.accessToken, + ); +} + +function openTunnelbrokerConnection( + deviceID: string, + userID: string, + accessToken: string, +) { + try { + const tunnelbrokerSocket = new WebSocket('ws://127.0.0.1:51001'); + + tunnelbrokerSocket.on('open', () => { + const message: TBKeyserverConnectionInitializationMessage = { + type: 'sessionRequest', + accessToken, + deviceId: deviceID, + deviceType: 'keyserver', + userId: deviceID, + }; + + tunnelbrokerSocket.send(JSON.stringify(message)); + console.info('Connection to Tunnelbroker established'); + }); + + tunnelbrokerSocket.on('close', async () => { + console.warn('Connection to Tunnelbroker closed'); + await sleep(1000); + console.info('Attempting to re-establish Tunnelbroker connection'); + openTunnelbrokerConnection(deviceID, userID, accessToken); + }); + + tunnelbrokerSocket.on('error', (error: Error) => { + console.error('Tunnelbroker socket error', error.message); + }); + } catch { + console.log('Failed to open connection with Tunnelbroker'); + } +} + +export { createAndMaintainTunnelbrokerWebsocket }; diff --git a/lib/types/tunnelbroker-messages.js b/lib/types/tunnelbroker-messages.js new file mode 100644 --- /dev/null +++ b/lib/types/tunnelbroker-messages.js @@ -0,0 +1,30 @@ +// @flow + +type TBSharedConnectionInitializationMessage = { + +type: 'sessionRequest', + +deviceId: string, + +accessToken: string, + +deviceAppVersion?: string, + +userId: string, +}; + +export type TBKeyserverConnectionInitializationMessage = { + ...TBSharedConnectionInitializationMessage, + +deviceType: 'keyserver', +}; + +export type TBClientConnectionInitializationMessage = { + ...TBSharedConnectionInitializationMessage, + +deviceType: 'web' | 'mobile', +}; + +export type TBNotifyClientConnectionInitializationMessage = { + ...TBClientConnectionInitializationMessage, + +notifyToken: string, + +notifyPlatform: 'apns' | 'fcm' | 'web' | 'wns', +}; + +export type TBConnectionInitializationMessage = + | TBKeyserverConnectionInitializationMessage + | TBClientConnectionInitializationMessage + | TBNotifyClientConnectionInitializationMessage; diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_test.rs @@ -20,6 +20,7 @@ "type": "sessionRequest", "accessToken": "xkdeifjsld", "deviceId": "foo", + "userId": "alice", "deviceType": "keyserver" }"#; diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -49,7 +49,7 @@ let serialized_message = serde_json::from_str::(message)?; match serialized_message { - Messages::SessionRequest(mut session_info) => { + Messages::ConnectionInitializationMessage(mut session_info) => { let device_info = DeviceInfo { device_id: session_info.device_id.clone(), notify_token: session_info.notify_token.take(), @@ -110,20 +110,12 @@ } pub async fn handle_websocket_frame_from_device( - &mut self, - frame: Message, + &self, + msg: Message, ) -> Result<(), SessionError> { - match frame { - Message::Text(payload) => { - debug!("Received message from device: {}", payload); - Ok(()) - } - Message::Close(_) => { - self.close().await; - Ok(()) - } - _ => Err(SessionError::InvalidMessage), - } + debug!("Received frame: {:?}", msg); + + Ok(()) } pub async fn next_amqp_message( diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs --- a/shared/tunnelbroker_messages/src/messages/mod.rs +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -11,5 +11,5 @@ #[serde(untagged)] pub enum Messages { RefreshKeysRequest(RefreshKeyRequest), - SessionRequest(ConnectionInitializationMessage), + ConnectionInitializationMessage(ConnectionInitializationMessage), } diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs --- a/shared/tunnelbroker_messages/src/messages/session.rs +++ b/shared/tunnelbroker_messages/src/messages/session.rs @@ -2,8 +2,8 @@ use serde::{Deserialize, Serialize}; -/// The workflow when establishing a tunnelbroker connection: -/// - Client sends SessionRequest +/// The workflow when estabilishing a tunnelbroker connection: +/// - Client sends ConnectionInitializationMessage /// - Tunnelbroker validates access_token with identity service /// - Tunnelbroker emits an AMQP message declaring that it has opened a new /// connection with a given device, so that the respective tunnelbroker @@ -32,9 +32,9 @@ #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub struct ConnectionInitializationMessage { - pub user_id: String, pub device_id: String, pub access_token: String, + pub user_id: String, pub notify_token: Option, pub device_type: DeviceTypes, pub device_app_version: Option, @@ -42,7 +42,7 @@ } #[derive(Serialize, Deserialize)] -pub struct SessionResponse { +pub struct ConnectionInitializationResponse { pub session_id: String, } diff --git a/yarn.lock b/yarn.lock --- a/yarn.lock +++ b/yarn.lock @@ -24415,6 +24415,11 @@ resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591" integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q== +ws@^8.13.0: + version "8.13.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0" + integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA== + ws@^8.4.2: version "8.12.0" resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"