diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs new file mode 100644 --- /dev/null +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -0,0 +1,110 @@ +mod client { + tonic::include_proto!("identity.client"); +} +mod auth_proto { + tonic::include_proto!("identity.authenticated"); +} +use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient; +use client::identity_client_service_client::IdentityClientServiceClient; +use client::UploadOneTimeKeysRequest; +use commtest::identity::device::create_device; +use futures_util::SinkExt; +use futures_util::StreamExt; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tonic::transport::Endpoint; +use tonic::Request; +use tunnelbroker_messages::{ + ConnectionInitializationMessage, DeviceTypes, RefreshKeyRequest, +}; + +#[tokio::test] +async fn test_refresh_keys_request_upon_depletion() { + let device_info = create_device().await; + + let mut identity_client = + IdentityClientServiceClient::connect("http://127.0.0.1:50054") + .await + .expect("Couldn't connect to identitiy service"); + + let upload_request = UploadOneTimeKeysRequest { + user_id: device_info.user_id.clone(), + device_id: device_info.device_id.clone(), + access_token: device_info.access_token.clone(), + content_one_time_pre_keys: vec!["content1".to_string()], + notif_one_time_pre_keys: vec!["notif1".to_string()], + }; + + identity_client + .upload_one_time_keys(upload_request) + .await + .unwrap(); + + // Request outbound keys, which should trigger identity service to ask for more keys + let channel = Endpoint::from_static("http://[::1]:50054") + .connect() + .await + .unwrap(); + + let mut client = + AuthClient::with_interceptor(channel, |mut inter_request: Request<()>| { + let metadata = inter_request.metadata_mut(); + metadata.insert("user_id", device_info.user_id.parse().unwrap()); + metadata.insert("device_id", device_info.device_id.parse().unwrap()); + metadata + .insert("access_token", device_info.access_token.parse().unwrap()); + Ok(inter_request) + }); + + let keyserver_request = auth_proto::OutboundKeysForUserRequest { + user_id: device_info.user_id.clone(), + }; + + println!("Getting keyserver info for user, {}", device_info.user_id); + let first_reponse = client + .get_keyserver_keys(keyserver_request.clone()) + .await + .expect("Second keyserver keys request failed") + .into_inner() + .keyserver_info + .unwrap(); + + // The current threshold is 5, but we only upload two. Should receive request + // from tunnelbroker to refresh keys + // Create session as a keyserver + + let (mut socket, _) = connect_async("ws://localhost:51001") + .await + .expect("Can't connect"); + + let session_request = ConnectionInitializationMessage { + device_id: device_info.device_id.to_string(), + access_token: device_info.access_token.to_string(), + user_id: device_info.user_id.to_string(), + notify_token: None, + device_type: DeviceTypes::Keyserver, + device_app_version: None, + device_os: None, + }; + + let serialized_request = serde_json::to_string(&session_request) + .expect("Failed to serialize connection request"); + + socket + .send(Message::Text(serialized_request)) + .await + .expect("Failed to send message"); + + // Have keyserver receive any websocket messages + if let Some(Ok(response)) = socket.next().await { + // Check that message received by keyserver matches what identity server + // issued + let serialized_response: RefreshKeyRequest = + serde_json::from_str(&response.to_text().unwrap()).unwrap(); + + let expected_response = RefreshKeyRequest { + device_id: device_info.device_id.to_string(), + number_of_keys: 5, + }; + assert_eq!(serialized_response, expected_response); + }; +} diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs --- a/services/tunnelbroker/src/config.rs +++ b/services/tunnelbroker/src/config.rs @@ -14,7 +14,7 @@ #[arg(long, default_value_t = 51001)] pub http_port: u16, /// AMQP server URI - #[arg(long, default_value_t = String::from("amqp://localhost:5672"))] + #[arg(long, default_value_t = String::from("amqp://comm:comm@localhost:5672"))] pub amqp_uri: String, /// AWS Localstack service URL #[arg(env = "LOCALSTACK_ENDPOINT")] diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs --- a/shared/tunnelbroker_messages/src/messages/mod.rs +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -11,5 +11,5 @@ #[serde(untagged)] pub enum Messages { RefreshKeysRequest(RefreshKeyRequest), - SessionRequest(SessionRequest), + SessionRequest(ConnectionInitializationMessage), } diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs --- a/shared/tunnelbroker_messages/src/messages/session.rs +++ b/shared/tunnelbroker_messages/src/messages/session.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; -/// The workflow when estabilishing a tunnelbroker connection: +/// The workflow when establishing a tunnelbroker connection: /// - Client sends SessionRequest /// - Tunnelbroker validates access_token with identity service /// - Tunnelbroker emits an AMQP message declaring that it has opened a new @@ -31,7 +31,8 @@ /// service before continuing with the request. #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] -pub struct SessionRequest { +pub struct ConnectionInitializationMessage { + pub user_id: String, pub device_id: String, pub access_token: String, pub notify_token: Option, @@ -55,11 +56,13 @@ "type": "sessionRequest", "accessToken": "xkdeifjsld", "deviceId": "foo", + "userId": "alice", "deviceType": "keyserver" }"#; let request = - serde_json::from_str::(example_payload).unwrap(); + serde_json::from_str::(example_payload) + .unwrap(); assert_eq!(request.device_id, "foo"); assert_eq!(request.access_token, "xkdeifjsld"); assert_eq!(request.device_os, None);