Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3361723
D7691.id30012.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Referenced Files
None
Subscribers
None
D7691.id30012.diff
View Options
diff --git a/keyserver/package.json b/keyserver/package.json
--- a/keyserver/package.json
+++ b/keyserver/package.json
@@ -83,7 +83,8 @@
"twin-bcrypt": "^2.1.1",
"uuid": "^3.4.0",
"web": "0.0.1",
- "web-push": "^3.5.0"
+ "web-push": "^3.5.0",
+ "ws": "^8.13.0"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
diff --git a/keyserver/src/keyserver.js b/keyserver/src/keyserver.js
--- a/keyserver/src/keyserver.js
+++ b/keyserver/src/keyserver.js
@@ -14,6 +14,7 @@
import {
jsonHandler,
downloadHandler,
+ handleAsyncPromise,
htmlHandler,
uploadHandler,
} from './responders/handlers.js';
@@ -25,6 +26,7 @@
} from './responders/website-responders.js';
import { webWorkerResponder } from './responders/webworker-responders.js';
import { onConnection } from './socket/socket.js';
+import { createAndMaintainTunnelbrokerWebsocket } from './socket/tunnelbroker.js';
import {
multerProcessor,
multimediaUploadResponder,
@@ -62,7 +64,14 @@
// Allow login to be optional until staging environment is available
try {
- await verifyUserLoggedIn();
+ // We await here to ensure that keyserver has been provisioned a
+ // "commServicesAccessToken". In the future, this will be necessary for
+ // many keyserver operations.
+ const identityInfo = await verifyUserLoggedIn();
+ // We don't await here as tunnelbroker communication is not needed for
+ // normal keyserver behavior. In addition, this doesn't return information
+ // useful for other keyserver functions.
+ handleAsyncPromise(createAndMaintainTunnelbrokerWebsocket(identityInfo));
} catch (e) {
console.warn('failed_identity_login');
}
diff --git a/keyserver/src/socket/tunnelbroker.js b/keyserver/src/socket/tunnelbroker.js
new file mode 100644
--- /dev/null
+++ b/keyserver/src/socket/tunnelbroker.js
@@ -0,0 +1,60 @@
+// @flow
+
+import WebSocket from 'ws';
+
+import { type TBKeyserverConnectionInitializationMessage } from 'lib/types/tunnelbroker-messages.js';
+import sleep from 'lib/utils/sleep.js';
+
+import { fetchOlmAccount } from '../updaters/olm-account-updater.js';
+import type { IdentityInfo } from '../user/identity.js';
+
+async function createAndMaintainTunnelbrokerWebsocket(
+ identityInfo: IdentityInfo,
+) {
+ const accountInfo = await fetchOlmAccount('content');
+ const deviceId = JSON.parse(accountInfo.account.identity_keys()).curve25519;
+
+ openTunnelbrokerConnection(
+ deviceId,
+ identityInfo.userId,
+ identityInfo.accessToken,
+ );
+}
+
+function openTunnelbrokerConnection(
+ deviceID: string,
+ userID: string,
+ accessToken: string,
+) {
+ try {
+ const tunnelbrokerSocket = new WebSocket('ws://127.0.0.1:51001');
+
+ tunnelbrokerSocket.on('open', () => {
+ const message: TBKeyserverConnectionInitializationMessage = {
+ type: 'sessionRequest',
+ accessToken,
+ deviceId: deviceID,
+ deviceType: 'keyserver',
+ userId: deviceID,
+ };
+
+ tunnelbrokerSocket.send(JSON.stringify(message));
+ console.info('Connection to Tunnelbroker established');
+ });
+
+ tunnelbrokerSocket.on('close', async () => {
+ console.warn('Connection to Tunnelbroker closed');
+ await sleep(1000);
+ console.info('Attempting to re-establish Tunnelbroker connection');
+ openTunnelbrokerConnection(deviceID, userID, accessToken);
+ });
+
+ tunnelbrokerSocket.on('error', (error: Error) => {
+ console.error('Tunnelbroker socket error: ' + error.message);
+ });
+ } catch {
+ console.log('Failed to open connection with Tunnelbroker');
+ }
+}
+
+export { createAndMaintainTunnelbrokerWebsocket };
diff --git a/lib/types/tunnelbroker-messages.js b/lib/types/tunnelbroker-messages.js
new file mode 100644
--- /dev/null
+++ b/lib/types/tunnelbroker-messages.js
@@ -0,0 +1,30 @@
+// @flow
+
+type TBSharedConnectionInitializationMessage = {
+ +type: 'sessionRequest',
+ +deviceId: string,
+ +accessToken: string,
+ +deviceAppVersion?: string,
+ +userId: string,
+};
+
+export type TBKeyserverConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'keyserver',
+};
+
+export type TBClientConnectionInitializationMessage = {
+ ...TBSharedConnectionInitializationMessage,
+ +deviceType: 'web' | 'mobile',
+};
+
+export type TBNotifyClientConnectionInitializationMessage = {
+ ...TBClientConnectionInitializationMessage,
+ +notifyToken: string,
+ +notifyPlatform: 'apns' | 'fcm' | 'web' | 'wns',
+};
+
+export type TBConnectionInitializationMessage =
+ | TBKeyserverConnectionInitializationMessage
+ | TBClientConnectionInitializationMessage
+ | TBNotifyClientConnectionInitializationMessage;
diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs
--- a/services/commtest/tests/tunnelbroker_integration_test.rs
+++ b/services/commtest/tests/tunnelbroker_integration_test.rs
@@ -20,6 +20,7 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs
--- a/services/tunnelbroker/src/config.rs
+++ b/services/tunnelbroker/src/config.rs
@@ -14,7 +14,7 @@
#[arg(long, default_value_t = 51001)]
pub http_port: u16,
/// AMQP server URI
- #[arg(long, default_value_t = String::from("amqp://localhost:5672"))]
+ #[arg(long, default_value_t = String::from("amqp://comm:comm@localhost:5672"))]
pub amqp_uri: String,
/// AWS Localstack service URL
#[arg(env = "LOCALSTACK_ENDPOINT")]
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -49,7 +49,7 @@
let serialized_message = serde_json::from_str::<Messages>(message)?;
match serialized_message {
- Messages::SessionRequest(mut session_info) => {
+ Messages::ConnectionInitializationMessage(mut session_info) => {
let device_info = DeviceInfo {
device_id: session_info.device_id.clone(),
notify_token: session_info.notify_token.take(),
@@ -110,20 +110,12 @@
}
pub async fn handle_websocket_frame_from_device(
- &mut self,
- frame: Message,
+ &self,
+ msg: Message,
) -> Result<(), SessionError> {
- match frame {
- Message::Text(payload) => {
- debug!("Received message from device: {}", payload);
- Ok(())
- }
- Message::Close(_) => {
- self.close().await;
- Ok(())
- }
- _ => Err(SessionError::InvalidMessage),
- }
+ debug!("Received frame: {:?}", msg);
+
+ Ok(())
}
pub async fn next_amqp_message(
diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs
--- a/shared/tunnelbroker_messages/src/messages/mod.rs
+++ b/shared/tunnelbroker_messages/src/messages/mod.rs
@@ -11,5 +11,5 @@
#[serde(untagged)]
pub enum Messages {
RefreshKeysRequest(RefreshKeyRequest),
- SessionRequest(SessionRequest),
+ ConnectionInitializationMessage(ConnectionInitializationMessage),
}
diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs
--- a/shared/tunnelbroker_messages/src/messages/session.rs
+++ b/shared/tunnelbroker_messages/src/messages/session.rs
@@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
/// The workflow when estabilishing a tunnelbroker connection:
-/// - Client sends SessionRequest
+/// - Client sends ConnectionInitializationMessage
/// - Tunnelbroker validates access_token with identity service
/// - Tunnelbroker emits an AMQP message declaring that it has opened a new
/// connection with a given device, so that the respective tunnelbroker
@@ -31,9 +31,10 @@
/// service before continuing with the request.
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
-pub struct SessionRequest {
+pub struct ConnectionInitializationMessage {
pub device_id: String,
pub access_token: String,
+ pub user_id: String,
pub notify_token: Option<String>,
pub device_type: DeviceTypes,
pub device_app_version: Option<String>,
@@ -41,7 +42,7 @@
}
#[derive(Serialize, Deserialize)]
-pub struct SessionResponse {
+pub struct ConnectionInitializationResponse {
pub session_id: String,
}
@@ -55,11 +56,13 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
let request =
- serde_json::from_str::<SessionRequest>(example_payload).unwrap();
+ serde_json::from_str::<ConnectionInitializationMessage>(example_payload)
+ .unwrap();
assert_eq!(request.device_id, "foo");
assert_eq!(request.access_token, "xkdeifjsld");
assert_eq!(request.device_os, None);
diff --git a/yarn.lock b/yarn.lock
--- a/yarn.lock
+++ b/yarn.lock
@@ -24407,6 +24407,11 @@
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
+ws@^8.13.0:
+ version "8.13.0"
+ resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0"
+ integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==
+
ws@^8.4.2:
version "8.12.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Nov 25, 7:03 PM (20 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2580744
Default Alt Text
D7691.id30012.diff (9 KB)
Attached To
Mode
D7691: [Keyserver] Open websocket connection with tunnelbroker
Attached
Detach File
Event Timeline
Log In to Comment