Page MenuHomePhabricator

D7691.id28758.diff
No OneTemporary

D7691.id28758.diff

diff --git a/keyserver/package.json b/keyserver/package.json
--- a/keyserver/package.json
+++ b/keyserver/package.json
@@ -83,7 +83,8 @@
"twin-bcrypt": "^2.1.1",
"uuid": "^3.4.0",
"web": "0.0.1",
- "web-push": "^3.5.0"
+ "web-push": "^3.5.0",
+ "ws": "^8.13.0"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
diff --git a/keyserver/src/keyserver.js b/keyserver/src/keyserver.js
--- a/keyserver/src/keyserver.js
+++ b/keyserver/src/keyserver.js
@@ -25,6 +25,7 @@
} from './responders/website-responders.js';
import { webWorkerResponder } from './responders/webworker-responders.js';
import { onConnection } from './socket/socket.js';
+import { createAndMaintainTunnelbrokerWebsocket } from './socket/tunnelbroker.js';
import {
multerProcessor,
multimediaUploadResponder,
@@ -62,7 +63,8 @@
// Allow login to be optional until staging environment is available
try {
- await verifyUserLoggedIn();
+ const identityInfo = await verifyUserLoggedIn();
+ createAndMaintainTunnelbrokerWebsocket(identityInfo);
} catch (e) {
console.warn('failed_identity_login');
}
diff --git a/keyserver/src/socket/tunnelbroker.js b/keyserver/src/socket/tunnelbroker.js
new file mode 100644
--- /dev/null
+++ b/keyserver/src/socket/tunnelbroker.js
@@ -0,0 +1,62 @@
+// @flow
+
+import WebSocket from 'ws';
+
+import { type TBKeyserverConnectionInitializationMessage } from 'lib/types/tunnelbroker-messages.js';
+import sleep from 'lib/utils/sleep.js';
+
+import { fetchOlmAccount } from '../updaters/olm-account-updater.js';
+import type { IdentityInfo } from '../user/identity.js';
+
+async function createAndMaintainTunnelbrokerWebsocket(
+ identityInfo: IdentityInfo,
+) {
+ const accountInfo = await fetchOlmAccount('content');
+ const deviceId = JSON.parse(accountInfo.account.identity_keys()).curve25519;
+
+ await openTunnelbrokerConnection(
+ deviceId,
+ identityInfo.userId,
+ identityInfo.accessToken,
+ );
+}
+
+async function openTunnelbrokerConnection(
+ deviceId: string,
+ userId: string,
+ accessToken: string,
+) {
+ try {
+ const tunnelbrokerSocket = new WebSocket('ws://localhost:51001');
+
+ tunnelbrokerSocket.on('open', () => {
+ const message: TBKeyserverConnectionInitializationMessage = {
+ type: 'sessionRequest',
+ accessToken,
+ deviceId,
+ deviceType: 'keyserver',
+ userId,
+ };
+
+ console.log(
+ 'Sending message to tunnelbroker: ' + JSON.stringify(message),
+ );
+ tunnelbrokerSocket.send(JSON.stringify(message));
+ });
+
+ tunnelbrokerSocket.on('close', async () => {
+ console.log('Connection to tunnelbroker closed');
+ await sleep(1000);
+ console.log('Attempting to re-establish tunnelbroker connection');
+ openTunnelbrokerConnection(deviceId, userId, accessToken);
+ });
+
+ tunnelbrokerSocket.on('error', (error: Error) => {
+ console.error('Tunnelbroker socket error: ' + error.message);
+ });
+ } catch (err) {
+ console.log('Failed to open connection with tunnelbroker');
+ }
+}
+
+export { createAndMaintainTunnelbrokerWebsocket };
diff --git a/lib/types/tunnelbroker-messages.js b/lib/types/tunnelbroker-messages.js
new file mode 100644
--- /dev/null
+++ b/lib/types/tunnelbroker-messages.js
@@ -0,0 +1,30 @@
+// @flow
+
+type TBSharedConnectionInitializationMessage = {
+ +type: 'sessionRequest',
+ +deviceId: string,
+ +accessToken: string,
+ +deviceAppVersion?: string,
+ +userId: string,
+};
+
+export type TBKeyserverConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'keyserver',
+};
+
+export type TBClientConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'web' | 'mobile',
+};
+
+export type TBNotifyClientConnectionInitializationMessage = {
+ ...TBClientConnectionInitializationMessage,
+ +notifyToken: string,
+ +notifyPlatform: 'apns' | 'fcm' | 'web' | 'wns',
+};
+
+export type TBConnectionInitializationMessage =
+ | TBKeyserverConnectionInitializationMessage
+ | TBClientConnectionInitializationMessage
+ | TBNotifyClientConnectionInitializationMessage;
diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs
--- a/services/commtest/tests/tunnelbroker_integration_test.rs
+++ b/services/commtest/tests/tunnelbroker_integration_test.rs
@@ -20,6 +20,7 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -109,14 +109,13 @@
})
}
- pub async fn handle_websocket_frame_from_device(
- &mut self,
- frame: Message,
- ) -> Result<(), SessionError> {
- match frame {
- Message::Text(payload) => {
- debug!("Received message from device: {}", payload);
- Ok(())
+ pub fn handle_message_from_device(
+ &self,
+ message: &str,
+ ) -> Result<(), serde_json::Error> {
+ match serde_json::from_str::<Messages>(message)? {
+ Messages::ConnectionInitializationMessage(session_info) => {
+ ACTIVE_CONNECTIONS.insert(session_info.device_id, self.tx.clone());
}
Message::Close(_) => {
self.close().await;
diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs
--- a/shared/tunnelbroker_messages/src/messages/mod.rs
+++ b/shared/tunnelbroker_messages/src/messages/mod.rs
@@ -11,5 +11,5 @@
#[serde(untagged)]
pub enum Messages {
RefreshKeysRequest(RefreshKeyRequest),
- SessionRequest(SessionRequest),
+ ConnectionInitializationMessage(ConnectionInitializationMessage),
}
diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs
--- a/shared/tunnelbroker_messages/src/messages/session.rs
+++ b/shared/tunnelbroker_messages/src/messages/session.rs
@@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
/// The workflow when estabilishing a tunnelbroker connection:
-/// - Client sends SessionRequest
+/// - Client sends ConnectionInitializationMessage
/// - Tunnelbroker validates access_token with identity service
/// - Tunnelbroker emits an AMQP message declaring that it has opened a new
/// connection with a given device, so that the respective tunnelbroker
@@ -31,9 +31,10 @@
/// service before continuing with the request.
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
-pub struct SessionRequest {
+pub struct ConnectionInitializationMessage {
pub device_id: String,
pub access_token: String,
+ pub user_id: String,
pub notify_token: Option<String>,
pub device_type: DeviceTypes,
pub device_app_version: Option<String>,
@@ -41,7 +42,7 @@
}
#[derive(Serialize, Deserialize)]
-pub struct SessionResponse {
+pub struct ConnectionInitializationResponse {
pub session_id: String,
}
@@ -55,11 +56,13 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
let request =
- serde_json::from_str::<SessionRequest>(example_payload).unwrap();
+ serde_json::from_str::<ConnectionInitializationMessage>(example_payload)
+ .unwrap();
assert_eq!(request.device_id, "foo");
assert_eq!(request.access_token, "xkdeifjsld");
assert_eq!(request.device_os, None);
diff --git a/yarn.lock b/yarn.lock
--- a/yarn.lock
+++ b/yarn.lock
@@ -24354,6 +24354,11 @@
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
+ws@^8.13.0:
+ version "8.13.0"
+ resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0"
+ integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==
+
ws@^8.4.2:
version "8.12.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 29, 11:18 AM (20 h, 48 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2596800
Default Alt Text
D7691.id28758.diff (8 KB)

Event Timeline