diff --git a/services/commtest/src/tunnelbroker/mod.rs b/services/commtest/src/tunnelbroker/mod.rs index 8b1378917..d22cc845a 100644 --- a/services/commtest/src/tunnelbroker/mod.rs +++ b/services/commtest/src/tunnelbroker/mod.rs @@ -1 +1 @@ - +pub mod socket; diff --git a/services/commtest/src/tunnelbroker/socket.rs b/services/commtest/src/tunnelbroker/socket.rs new file mode 100644 index 000000000..cb25cf8f6 --- /dev/null +++ b/services/commtest/src/tunnelbroker/socket.rs @@ -0,0 +1,34 @@ +use crate::identity::device::DeviceInfo; +use futures_util::SinkExt; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tunnelbroker_messages::{ConnectionInitializationMessage, DeviceTypes}; + +pub async fn create_socket( + device_info: &DeviceInfo, +) -> WebSocketStream> { + let (mut socket, _) = connect_async("ws://localhost:51001") + .await + .expect("Can't connect"); + + let session_request = ConnectionInitializationMessage { + device_id: device_info.device_id.to_string(), + access_token: device_info.access_token.to_string(), + user_id: device_info.user_id.to_string(), + notify_token: None, + device_type: DeviceTypes::Keyserver, + device_app_version: None, + device_os: None, + }; + + let serialized_request = serde_json::to_string(&session_request) + .expect("Failed to serialize connection request"); + + socket + .send(Message::Text(serialized_request)) + .await + .expect("Failed to send message"); + + socket +} diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs index c8eeb6ac2..27f44683e 100644 --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -1,174 +1,114 @@ mod client { tonic::include_proto!("identity.client"); } mod auth_proto { tonic::include_proto!("identity.authenticated"); } use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient; use client::identity_client_service_client::IdentityClientServiceClient; use client::UploadOneTimeKeysRequest; use commtest::identity::device::create_device; +use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; -use futures_util::{SinkExt, TryStreamExt}; -use tokio_tungstenite::{connect_async, tungstenite::Message}; use tonic::transport::Endpoint; use tonic::Request; -use tunnelbroker_messages::{ - ConnectionInitializationMessage, DeviceTypes, RefreshKeyRequest, -}; +use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] #[should_panic] async fn test_tunnelbroker_invalid_auth() { - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - - let session_request = ConnectionInitializationMessage { - device_id: "".to_string(), - access_token: "".to_string(), - user_id: "".to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let mut device_info = create_device().await; + device_info.access_token = "".to_string(); + let mut socket = create_socket(&device_info).await; socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_tunnelbroker_valid_auth() { - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - let device_info = create_device().await; - let session_request = ConnectionInitializationMessage { - device_id: device_info.device_id.to_string(), - access_token: device_info.access_token.to_string(), - user_id: device_info.user_id.to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let mut socket = create_socket(&device_info).await; socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_refresh_keys_request_upon_depletion() { let device_info = create_device().await; let mut identity_client = IdentityClientServiceClient::connect("http://127.0.0.1:50054") .await .expect("Couldn't connect to identitiy service"); let upload_request = UploadOneTimeKeysRequest { user_id: device_info.user_id.clone(), device_id: device_info.device_id.clone(), access_token: device_info.access_token.clone(), content_one_time_pre_keys: vec!["content1".to_string()], notif_one_time_pre_keys: vec!["notif1".to_string()], }; identity_client .upload_one_time_keys(upload_request) .await .unwrap(); // Request outbound keys, which should trigger identity service to ask for more keys let channel = Endpoint::from_static("http://[::1]:50054") .connect() .await .unwrap(); let mut client = AuthClient::with_interceptor(channel, |mut inter_request: Request<()>| { let metadata = inter_request.metadata_mut(); metadata.insert("user_id", device_info.user_id.parse().unwrap()); metadata.insert("device_id", device_info.device_id.parse().unwrap()); metadata .insert("access_token", device_info.access_token.parse().unwrap()); Ok(inter_request) }); let keyserver_request = auth_proto::OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); let first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The current threshold is 5, but we only upload two. Should receive request // from tunnelbroker to refresh keys // Create session as a keyserver - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - - let session_request = ConnectionInitializationMessage { - device_id: device_info.device_id.to_string(), - access_token: device_info.access_token.to_string(), - user_id: device_info.user_id.to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let device_info = create_device().await; + let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; assert_eq!(serialized_response, expected_response); }; } diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs index 871893a3d..87d59474a 100644 --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_test.rs @@ -1,130 +1,93 @@ -use futures_util::SinkExt; -use tokio_tungstenite::{connect_async, tungstenite::Message}; mod proto { tonic::include_proto!("tunnelbroker"); } use commtest::identity::device::create_device; +use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use tunnelbroker_messages as messages; -use tunnelbroker_messages::{ - ConnectionInitializationMessage, DeviceTypes, RefreshKeyRequest, -}; +use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn send_refresh_request() { // Create session as a keyserver - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - let device_info = create_device().await; - let session_request = ConnectionInitializationMessage { - device_id: device_info.device_id.to_string(), - access_token: device_info.access_token.to_string(), - user_id: device_info.user_id.to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let mut socket = create_socket(&device_info).await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") .await .unwrap(); let refresh_request = messages::RefreshKeyRequest { device_id: device_info.device_id.clone(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.clone(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Have keyserver receive any websocket messages let response = socket.next().await.unwrap().unwrap(); // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } /// Test that a message to an offline device gets pushed to dynamodb /// then recalled once a device connects #[tokio::test] async fn presist_messages() { + let device_info = create_device().await; + // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") .await .unwrap(); let refresh_request = messages::RefreshKeyRequest { - device_id: "bar".to_string(), + device_id: device_info.device_id.to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { - device_id: "bar".to_string(), + device_id: device_info.device_id.to_string(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Wait one second to ensure that message had time to persist use std::{thread, time}; let ten_millis = time::Duration::from_millis(50); thread::sleep(ten_millis); - // Create session as a keyserver - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - - let session_request = r#"{ - "type": "sessionRequest", - "accessToken": "xkdexfjsld", - "deviceID": "bar", - "deviceType": "keyserver" - }"#; - - socket - .send(Message::Text(session_request.to_string())) - .await - .expect("Failed to send message"); - + let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); }; }