Page MenuHomePhabricator

D8749.id30367.diff
No OneTemporary

D8749.id30367.diff

diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs
new file mode 100644
--- /dev/null
+++ b/services/commtest/tests/identity_tunnelbroker_tests.rs
@@ -0,0 +1,110 @@
+mod client {
+ tonic::include_proto!("identity.client");
+}
+mod auth_proto {
+ tonic::include_proto!("identity.authenticated");
+}
+use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient;
+use client::identity_client_service_client::IdentityClientServiceClient;
+use client::UploadOneTimeKeysRequest;
+use commtest::identity::device::create_device;
+use futures_util::SinkExt;
+use futures_util::StreamExt;
+use tokio_tungstenite::{connect_async, tungstenite::Message};
+use tonic::transport::Endpoint;
+use tonic::Request;
+use tunnelbroker_messages::{
+ ConnectionInitializationMessage, DeviceTypes, RefreshKeyRequest,
+};
+
+#[tokio::test]
+async fn test_refresh_keys_request_upon_depletion() {
+ let device_info = create_device().await;
+
+ let mut identity_client =
+ IdentityClientServiceClient::connect("http://127.0.0.1:50054")
+ .await
+ .expect("Couldn't connect to identitiy service");
+
+ let upload_request = UploadOneTimeKeysRequest {
+ user_id: device_info.user_id.clone(),
+ device_id: device_info.device_id.clone(),
+ access_token: device_info.access_token.clone(),
+ content_one_time_pre_keys: vec!["content1".to_string()],
+ notif_one_time_pre_keys: vec!["notif1".to_string()],
+ };
+
+ identity_client
+ .upload_one_time_keys(upload_request)
+ .await
+ .unwrap();
+
+ // Request outbound keys, which should trigger identity service to ask for more keys
+ let channel = Endpoint::from_static("http://[::1]:50054")
+ .connect()
+ .await
+ .unwrap();
+
+ let mut client =
+ AuthClient::with_interceptor(channel, |mut inter_request: Request<()>| {
+ let metadata = inter_request.metadata_mut();
+ metadata.insert("user_id", device_info.user_id.parse().unwrap());
+ metadata.insert("device_id", device_info.device_id.parse().unwrap());
+ metadata
+ .insert("access_token", device_info.access_token.parse().unwrap());
+ Ok(inter_request)
+ });
+
+ let keyserver_request = auth_proto::OutboundKeysForUserRequest {
+ user_id: device_info.user_id.clone(),
+ };
+
+ println!("Getting keyserver info for user, {}", device_info.user_id);
+ let first_reponse = client
+ .get_keyserver_keys(keyserver_request.clone())
+ .await
+ .expect("Second keyserver keys request failed")
+ .into_inner()
+ .keyserver_info
+ .unwrap();
+
+ // The current threshold is 5, but we only upload two. Should receive request
+ // from tunnelbroker to refresh keys
+ // Create session as a keyserver
+
+ let (mut socket, _) = connect_async("ws://localhost:51001")
+ .await
+ .expect("Can't connect");
+
+ let session_request = ConnectionInitializationMessage {
+ device_id: device_info.device_id.to_string(),
+ access_token: device_info.access_token.to_string(),
+ user_id: device_info.user_id.to_string(),
+ notify_token: None,
+ device_type: DeviceTypes::Keyserver,
+ device_app_version: None,
+ device_os: None,
+ };
+
+ let serialized_request = serde_json::to_string(&session_request)
+ .expect("Failed to serialize connection request");
+
+ socket
+ .send(Message::Text(serialized_request))
+ .await
+ .expect("Failed to send message");
+
+ // Have keyserver receive any websocket messages
+ if let Some(Ok(response)) = socket.next().await {
+ // Check that message received by keyserver matches what identity server
+ // issued
+ let serialized_response: RefreshKeyRequest =
+ serde_json::from_str(&response.to_text().unwrap()).unwrap();
+
+ let expected_response = RefreshKeyRequest {
+ device_id: device_info.device_id.to_string(),
+ number_of_keys: 5,
+ };
+ assert_eq!(serialized_response, expected_response);
+ };
+}
diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs
--- a/services/tunnelbroker/src/config.rs
+++ b/services/tunnelbroker/src/config.rs
@@ -14,7 +14,7 @@
#[arg(long, default_value_t = 51001)]
pub http_port: u16,
/// AMQP server URI
- #[arg(long, default_value_t = String::from("amqp://localhost:5672"))]
+ #[arg(long, default_value_t = String::from("amqp://comm:comm@localhost:5672"))]
pub amqp_uri: String,
/// AWS Localstack service URL
#[arg(env = "LOCALSTACK_ENDPOINT")]
diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs
--- a/shared/tunnelbroker_messages/src/messages/mod.rs
+++ b/shared/tunnelbroker_messages/src/messages/mod.rs
@@ -11,5 +11,5 @@
#[serde(untagged)]
pub enum Messages {
RefreshKeysRequest(RefreshKeyRequest),
- SessionRequest(SessionRequest),
+ SessionRequest(ConnectionInitializationMessage),
}
diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs
--- a/shared/tunnelbroker_messages/src/messages/session.rs
+++ b/shared/tunnelbroker_messages/src/messages/session.rs
@@ -2,7 +2,7 @@
use serde::{Deserialize, Serialize};
-/// The workflow when estabilishing a tunnelbroker connection:
+/// The workflow when establishing a tunnelbroker connection:
/// - Client sends SessionRequest
/// - Tunnelbroker validates access_token with identity service
/// - Tunnelbroker emits an AMQP message declaring that it has opened a new
@@ -31,7 +31,8 @@
/// service before continuing with the request.
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
-pub struct SessionRequest {
+pub struct ConnectionInitializationMessage {
+ pub user_id: String,
pub device_id: String,
pub access_token: String,
pub notify_token: Option<String>,
@@ -55,11 +56,13 @@
"type": "sessionRequest",
"accessToken": "xkdeifjsld",
"deviceId": "foo",
+ "userId": "alice",
"deviceType": "keyserver"
}"#;
let request =
- serde_json::from_str::<SessionRequest>(example_payload).unwrap();
+ serde_json::from_str::<ConnectionInitializationMessage>(example_payload)
+ .unwrap();
assert_eq!(request.device_id, "foo");
assert_eq!(request.access_token, "xkdeifjsld");
assert_eq!(request.device_os, None);

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 4:50 AM (17 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2566770
Default Alt Text
D8749.id30367.diff (6 KB)

Event Timeline