Page MenuHomePhabricator

D7691.id30013.diff
No OneTemporary

D7691.id30013.diff

diff --git a/keyserver/package.json b/keyserver/package.json
--- a/keyserver/package.json
+++ b/keyserver/package.json
@@ -83,7 +83,8 @@
"twin-bcrypt": "^2.1.1",
"uuid": "^3.4.0",
"web": "0.0.1",
- "web-push": "^3.5.0"
+ "web-push": "^3.5.0",
+ "ws": "^8.13.0"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
diff --git a/keyserver/src/keyserver.js b/keyserver/src/keyserver.js
--- a/keyserver/src/keyserver.js
+++ b/keyserver/src/keyserver.js
@@ -14,6 +14,7 @@
import {
jsonHandler,
downloadHandler,
+ handleAsyncPromise,
htmlHandler,
uploadHandler,
} from './responders/handlers.js';
@@ -25,6 +26,7 @@
} from './responders/website-responders.js';
import { webWorkerResponder } from './responders/webworker-responders.js';
import { onConnection } from './socket/socket.js';
+import { createAndMaintainTunnelbrokerWebsocket } from './socket/tunnelbroker.js';
import {
multerProcessor,
multimediaUploadResponder,
@@ -62,7 +64,14 @@
// Allow login to be optional until staging environment is available
try {
- await verifyUserLoggedIn();
+ // We await here to ensure Keyserver has been provisioned a
+ // "commServicesAccessToken". In the future, this will be necessary for
+ // many Keyserver operations.
+ const identityInfo = await verifyUserLoggedIn();
+ // We don't await here, as Tunnelbroker communication is not needed for
+ // normal Keyserver behavior. In addition, this doesn't return information
+ // useful for other Keyserver functions.
+ handleAsyncPromise(createAndMaintainTunnelbrokerWebsocket(identityInfo));
} catch (e) {
console.warn('failed_identity_login');
}
diff --git a/keyserver/src/socket/tunnelbroker.js b/keyserver/src/socket/tunnelbroker.js
new file mode 100644
--- /dev/null
+++ b/keyserver/src/socket/tunnelbroker.js
@@ -0,0 +1,60 @@
+// @flow
+
+import WebSocket from 'ws';
+
+import { type TBKeyserverConnectionInitializationMessage } from 'lib/types/tunnelbroker-messages.js';
+import sleep from 'lib/utils/sleep.js';
+
+import { fetchOlmAccount } from '../updaters/olm-account-updater.js';
+import type { IdentityInfo } from '../user/identity.js';
+
+async function createAndMaintainTunnelbrokerWebsocket(
+ identityInfo: IdentityInfo,
+) {
+ const accountInfo = await fetchOlmAccount('content');
+ const deviceId = JSON.parse(accountInfo.account.identity_keys()).curve25519;
+
+ openTunnelbrokerConnection(
+ deviceId,
+ identityInfo.userId,
+ identityInfo.accessToken,
+ );
+}
+
+function openTunnelbrokerConnection(
+ deviceID: string,
+ userID: string,
+ accessToken: string,
+) {
+ try {
+ const tunnelbrokerSocket = new WebSocket('ws://127.0.0.1:51001');
+
+ tunnelbrokerSocket.on('open', () => {
+ const message: TBKeyserverConnectionInitializationMessage = {
+ type: 'sessionRequest',
+ accessToken,
+ deviceId: deviceID,
+ deviceType: 'keyserver',
+ userId: deviceID,
+ };
+
+ tunnelbrokerSocket.send(JSON.stringify(message));
+ console.info('Connection to Tunnelbroker established');
+ });
+
+ tunnelbrokerSocket.on('close', async () => {
+ console.warn('Connection to Tunnelbroker closed');
+ await sleep(1000);
+ console.info('Attempting to re-establish Tunnelbroker connection');
+ openTunnelbrokerConnection(deviceID, userID, accessToken);
+ });
+
+ tunnelbrokerSocket.on('error', (error: Error) => {
+ console.error('Tunnelbroker socket error: ' + error.message);
+ });
+ } catch {
+ console.log('Failed to open connection with Tunnelbroker');
+ }
+}
+
+export { createAndMaintainTunnelbrokerWebsocket };
diff --git a/lib/types/tunnelbroker-messages.js b/lib/types/tunnelbroker-messages.js
new file mode 100644
--- /dev/null
+++ b/lib/types/tunnelbroker-messages.js
@@ -0,0 +1,30 @@
+// @flow
+
+type TBSharedConnectionInitializationMessage = {
+ +type: 'sessionRequest',
+ +deviceId: string,
+ +accessToken: string,
+ +deviceAppVersion?: string,
+ +userId: string,
+};
+
+export type TBKeyserverConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'keyserver',
+};
+
+export type TBClientConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'web' | 'mobile',
+};
+
+export type TBNotifyClientConnectionInitializationMessage = {
+ ...TBClientConnectionInitializationMessage,
+ +notifyToken: string,
+ +notifyPlatform: 'apns' | 'fcm' | 'web' | 'wns',
+};
+
+export type TBConnectionInitializationMessage =
+ | TBKeyserverConnectionInitializationMessage
+ | TBClientConnectionInitializationMessage
+ | TBNotifyClientConnectionInitializationMessage;
diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs
--- a/services/commtest/tests/tunnelbroker_integration_test.rs
+++ b/services/commtest/tests/tunnelbroker_integration_test.rs
@@ -20,6 +20,7 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs
--- a/services/tunnelbroker/src/config.rs
+++ b/services/tunnelbroker/src/config.rs
@@ -14,7 +14,7 @@
#[arg(long, default_value_t = 51001)]
pub http_port: u16,
/// AMQP server URI
- #[arg(long, default_value_t = String::from("amqp://localhost:5672"))]
+ #[arg(long, default_value_t = String::from("amqp://comm:comm@localhost:5672"))]
pub amqp_uri: String,
/// AWS Localstack service URL
#[arg(env = "LOCALSTACK_ENDPOINT")]
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -49,7 +49,7 @@
let serialized_message = serde_json::from_str::<Messages>(message)?;
match serialized_message {
- Messages::SessionRequest(mut session_info) => {
+ Messages::ConnectionInitializationMessage(mut session_info) => {
let device_info = DeviceInfo {
device_id: session_info.device_id.clone(),
notify_token: session_info.notify_token.take(),
@@ -110,20 +110,12 @@
}
pub async fn handle_websocket_frame_from_device(
- &mut self,
- frame: Message,
+ &self,
+ msg: Message,
) -> Result<(), SessionError> {
- match frame {
- Message::Text(payload) => {
- debug!("Received message from device: {}", payload);
- Ok(())
- }
- Message::Close(_) => {
- self.close().await;
- Ok(())
- }
- _ => Err(SessionError::InvalidMessage),
- }
+ debug!("Received frame: {:?}", msg);
+
+ Ok(())
}
pub async fn next_amqp_message(
diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs
--- a/shared/tunnelbroker_messages/src/messages/mod.rs
+++ b/shared/tunnelbroker_messages/src/messages/mod.rs
@@ -11,5 +11,5 @@
#[serde(untagged)]
pub enum Messages {
RefreshKeysRequest(RefreshKeyRequest),
- SessionRequest(SessionRequest),
+ ConnectionInitializationMessage(ConnectionInitializationMessage),
}
diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs
--- a/shared/tunnelbroker_messages/src/messages/session.rs
+++ b/shared/tunnelbroker_messages/src/messages/session.rs
@@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
/// The workflow when estabilishing a tunnelbroker connection:
-/// - Client sends SessionRequest
+/// - Client sends ConnectionInitializationMessage
/// - Tunnelbroker validates access_token with identity service
/// - Tunnelbroker emits an AMQP message declaring that it has opened a new
/// connection with a given device, so that the respective tunnelbroker
@@ -31,9 +31,10 @@
/// service before continuing with the request.
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
-pub struct SessionRequest {
+pub struct ConnectionInitializationMessage {
pub device_id: String,
pub access_token: String,
+ pub user_id: String,
pub notify_token: Option<String>,
pub device_type: DeviceTypes,
pub device_app_version: Option<String>,
@@ -41,7 +42,7 @@
}
#[derive(Serialize, Deserialize)]
-pub struct SessionResponse {
+pub struct ConnectionInitializationResponse {
pub session_id: String,
}
@@ -55,11 +56,13 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
let request =
- serde_json::from_str::<SessionRequest>(example_payload).unwrap();
+ serde_json::from_str::<ConnectionInitializationMessage>(example_payload)
+ .unwrap();
assert_eq!(request.device_id, "foo");
assert_eq!(request.access_token, "xkdeifjsld");
assert_eq!(request.device_os, None);
diff --git a/yarn.lock b/yarn.lock
--- a/yarn.lock
+++ b/yarn.lock
@@ -24407,6 +24407,11 @@
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
+ws@^8.13.0:
+ version "8.13.0"
+ resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0"
+ integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==
+
ws@^8.4.2:
version "8.12.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 25, 1:30 PM (20 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2579924
Default Alt Text
D7691.id30013.diff (9 KB)

Event Timeline