Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3382563
D7691.id28758.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Referenced Files
None
Subscribers
None
D7691.id28758.diff
View Options
diff --git a/keyserver/package.json b/keyserver/package.json
--- a/keyserver/package.json
+++ b/keyserver/package.json
@@ -83,7 +83,8 @@
"twin-bcrypt": "^2.1.1",
"uuid": "^3.4.0",
"web": "0.0.1",
- "web-push": "^3.5.0"
+ "web-push": "^3.5.0",
+ "ws": "^8.13.0"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
diff --git a/keyserver/src/keyserver.js b/keyserver/src/keyserver.js
--- a/keyserver/src/keyserver.js
+++ b/keyserver/src/keyserver.js
@@ -25,6 +25,7 @@
} from './responders/website-responders.js';
import { webWorkerResponder } from './responders/webworker-responders.js';
import { onConnection } from './socket/socket.js';
+import { createAndMaintainTunnelbrokerWebsocket } from './socket/tunnelbroker.js';
import {
multerProcessor,
multimediaUploadResponder,
@@ -62,7 +63,8 @@
// Allow login to be optional until staging environment is available
try {
- await verifyUserLoggedIn();
+ const identityInfo = await verifyUserLoggedIn();
+ createAndMaintainTunnelbrokerWebsocket(identityInfo);
} catch (e) {
console.warn('failed_identity_login');
}
diff --git a/keyserver/src/socket/tunnelbroker.js b/keyserver/src/socket/tunnelbroker.js
new file mode 100644
--- /dev/null
+++ b/keyserver/src/socket/tunnelbroker.js
@@ -0,0 +1,62 @@
+// @flow
+
+import WebSocket from 'ws';
+
+import { type TBKeyserverConnectionInitializationMessage } from 'lib/types/tunnelbroker-messages.js';
+import sleep from 'lib/utils/sleep.js';
+
+import { fetchOlmAccount } from '../updaters/olm-account-updater.js';
+import type { IdentityInfo } from '../user/identity.js';
+
+async function createAndMaintainTunnelbrokerWebsocket(
+ identityInfo: IdentityInfo,
+) {
+ const accountInfo = await fetchOlmAccount('content');
+ const deviceId = JSON.parse(accountInfo.account.identity_keys()).curve25519;
+
+ await openTunnelbrokerConnection(
+ deviceId,
+ identityInfo.userId,
+ identityInfo.accessToken,
+ );
+}
+
+async function openTunnelbrokerConnection(
+ deviceId: string,
+ userId: string,
+ accessToken: string,
+) {
+ try {
+ const tunnelbrokerSocket = new WebSocket('ws://localhost:51001');
+
+ tunnelbrokerSocket.on('open', () => {
+ const message: TBKeyserverConnectionInitializationMessage = {
+ type: 'sessionRequest',
+ accessToken,
+ deviceId,
+ deviceType: 'keyserver',
+ userId,
+ };
+
+ console.log(
+ 'Sending message to tunnelbroker: ' + JSON.stringify(message),
+ );
+ tunnelbrokerSocket.send(JSON.stringify(message));
+ });
+
+ tunnelbrokerSocket.on('close', async () => {
+ console.log('Connection to tunnelbroker closed');
+ await sleep(1000);
+ console.log('Attempting to re-establish tunnelbroker connection');
+ openTunnelbrokerConnection(deviceId, userId, accessToken);
+ });
+
+ tunnelbrokerSocket.on('error', (error: Error) => {
+ console.error('Tunnelbroker socket error: ' + error.message);
+ });
+ } catch (err) {
+ console.log('Failed to open connection with tunnelbroker');
+ }
+}
+
+export { createAndMaintainTunnelbrokerWebsocket };
diff --git a/lib/types/tunnelbroker-messages.js b/lib/types/tunnelbroker-messages.js
new file mode 100644
--- /dev/null
+++ b/lib/types/tunnelbroker-messages.js
@@ -0,0 +1,30 @@
+// @flow
+
+type TBSharedConnectionInitializationMessage = {
+ +type: 'sessionRequest',
+ +deviceId: string,
+ +accessToken: string,
+ +deviceAppVersion?: string,
+ +userId: string,
+};
+
+export type TBKeyserverConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'keyserver',
+};
+
+export type TBClientConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'web' | 'mobile',
+};
+
+export type TBNotifyClientConnectionInitializationMessage = {
+ ...TBClientConnectionInitializationMessage,
+ +notifyToken: string,
+ +notifyPlatform: 'apns' | 'fcm' | 'web' | 'wns',
+};
+
+export type TBConnectionInitializationMessage =
+ | TBKeyserverConnectionInitializationMessage
+ | TBClientConnectionInitializationMessage
+ | TBNotifyClientConnectionInitializationMessage;
diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs
--- a/services/commtest/tests/tunnelbroker_integration_test.rs
+++ b/services/commtest/tests/tunnelbroker_integration_test.rs
@@ -20,6 +20,7 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -109,14 +109,13 @@
})
}
- pub async fn handle_websocket_frame_from_device(
- &mut self,
- frame: Message,
- ) -> Result<(), SessionError> {
- match frame {
- Message::Text(payload) => {
- debug!("Received message from device: {}", payload);
- Ok(())
+ pub fn handle_message_from_device(
+ &self,
+ message: &str,
+ ) -> Result<(), serde_json::Error> {
+ match serde_json::from_str::<Messages>(message)? {
+ Messages::ConnectionInitializationMessage(session_info) => {
+ ACTIVE_CONNECTIONS.insert(session_info.device_id, self.tx.clone());
}
Message::Close(_) => {
self.close().await;
diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs
--- a/shared/tunnelbroker_messages/src/messages/mod.rs
+++ b/shared/tunnelbroker_messages/src/messages/mod.rs
@@ -11,5 +11,5 @@
#[serde(untagged)]
pub enum Messages {
RefreshKeysRequest(RefreshKeyRequest),
- SessionRequest(SessionRequest),
+ ConnectionInitializationMessage(ConnectionInitializationMessage),
}
diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs
--- a/shared/tunnelbroker_messages/src/messages/session.rs
+++ b/shared/tunnelbroker_messages/src/messages/session.rs
@@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
/// The workflow when estabilishing a tunnelbroker connection:
-/// - Client sends SessionRequest
+/// - Client sends ConnectionInitializationMessage
/// - Tunnelbroker validates access_token with identity service
/// - Tunnelbroker emits an AMQP message declaring that it has opened a new
/// connection with a given device, so that the respective tunnelbroker
@@ -31,9 +31,10 @@
/// service before continuing with the request.
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
-pub struct SessionRequest {
+pub struct ConnectionInitializationMessage {
pub device_id: String,
pub access_token: String,
+ pub user_id: String,
pub notify_token: Option<String>,
pub device_type: DeviceTypes,
pub device_app_version: Option<String>,
@@ -41,7 +42,7 @@
}
#[derive(Serialize, Deserialize)]
-pub struct SessionResponse {
+pub struct ConnectionInitializationResponse {
pub session_id: String,
}
@@ -55,11 +56,13 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
let request =
- serde_json::from_str::<SessionRequest>(example_payload).unwrap();
+ serde_json::from_str::<ConnectionInitializationMessage>(example_payload)
+ .unwrap();
assert_eq!(request.device_id, "foo");
assert_eq!(request.access_token, "xkdeifjsld");
assert_eq!(request.device_os, None);
diff --git a/yarn.lock b/yarn.lock
--- a/yarn.lock
+++ b/yarn.lock
@@ -24354,6 +24354,11 @@
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
+ws@^8.13.0:
+ version "8.13.0"
+ resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0"
+ integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==
+
ws@^8.4.2:
version "8.12.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Nov 29, 11:18 AM (20 h, 48 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2596800
Default Alt Text
D7691.id28758.diff (8 KB)
Attached To
Mode
D7691: [Keyserver] Open websocket connection with tunnelbroker
Attached
Detach File
Event Timeline
Log In to Comment