diff --git a/services/commtest/src/tunnelbroker/mod.rs b/services/commtest/src/tunnelbroker/mod.rs --- a/services/commtest/src/tunnelbroker/mod.rs +++ b/services/commtest/src/tunnelbroker/mod.rs @@ -1 +1 @@ - +pub mod socket; diff --git a/services/commtest/src/tunnelbroker/socket.rs b/services/commtest/src/tunnelbroker/socket.rs new file mode 100644 --- /dev/null +++ b/services/commtest/src/tunnelbroker/socket.rs @@ -0,0 +1,34 @@ +use crate::identity::device::DeviceInfo; +use futures_util::SinkExt; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tunnelbroker_messages::{ConnectionInitializationMessage, DeviceTypes}; + +pub async fn create_socket( + device_info: &DeviceInfo, +) -> WebSocketStream> { + let (mut socket, _) = connect_async("ws://localhost:51001") + .await + .expect("Can't connect"); + + let session_request = ConnectionInitializationMessage { + device_id: device_info.device_id.to_string(), + access_token: device_info.access_token.to_string(), + user_id: device_info.user_id.to_string(), + notify_token: None, + device_type: DeviceTypes::Keyserver, + device_app_version: None, + device_os: None, + }; + + let serialized_request = serde_json::to_string(&session_request) + .expect("Failed to serialize connection request"); + + socket + .send(Message::Text(serialized_request)) + .await + .expect("Failed to send message"); + + socket +} diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -8,39 +8,18 @@ use client::identity_client_service_client::IdentityClientServiceClient; use client::UploadOneTimeKeysRequest; use commtest::identity::device::create_device; +use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; -use futures_util::{SinkExt, TryStreamExt}; -use tokio_tungstenite::{connect_async, tungstenite::Message}; use tonic::transport::Endpoint; use tonic::Request; -use tunnelbroker_messages::{ - ConnectionInitializationMessage, DeviceTypes, RefreshKeyRequest, -}; +use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] #[should_panic] async fn test_tunnelbroker_invalid_auth() { - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - - let session_request = ConnectionInitializationMessage { - device_id: "".to_string(), - access_token: "".to_string(), - user_id: "".to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let mut device_info = create_device().await; + device_info.access_token = "".to_string(); + let mut socket = create_socket(&device_info).await; socket .next() @@ -51,28 +30,8 @@ #[tokio::test] async fn test_tunnelbroker_valid_auth() { - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - let device_info = create_device().await; - let session_request = ConnectionInitializationMessage { - device_id: device_info.device_id.to_string(), - access_token: device_info.access_token.to_string(), - user_id: device_info.user_id.to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let mut socket = create_socket(&device_info).await; socket .next() @@ -136,27 +95,8 @@ // from tunnelbroker to refresh keys // Create session as a keyserver - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - - let session_request = ConnectionInitializationMessage { - device_id: device_info.device_id.to_string(), - access_token: device_info.access_token.to_string(), - user_id: device_info.user_id.to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let device_info = create_device().await; + let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_test.rs @@ -1,42 +1,19 @@ -use futures_util::SinkExt; -use tokio_tungstenite::{connect_async, tungstenite::Message}; mod proto { tonic::include_proto!("tunnelbroker"); } use commtest::identity::device::create_device; +use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use tunnelbroker_messages as messages; -use tunnelbroker_messages::{ - ConnectionInitializationMessage, DeviceTypes, RefreshKeyRequest, -}; +use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn send_refresh_request() { // Create session as a keyserver - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - let device_info = create_device().await; - let session_request = ConnectionInitializationMessage { - device_id: device_info.device_id.to_string(), - access_token: device_info.access_token.to_string(), - user_id: device_info.user_id.to_string(), - notify_token: None, - device_type: DeviceTypes::Keyserver, - device_app_version: None, - device_os: None, - }; - - let serialized_request = serde_json::to_string(&session_request) - .expect("Failed to serialize connection request"); - - socket - .send(Message::Text(serialized_request)) - .await - .expect("Failed to send message"); + let mut socket = create_socket(&device_info).await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = @@ -75,6 +52,8 @@ /// then recalled once a device connects #[tokio::test] async fn presist_messages() { + let device_info = create_device().await; + // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") @@ -82,13 +61,13 @@ .unwrap(); let refresh_request = messages::RefreshKeyRequest { - device_id: "bar".to_string(), + device_id: device_info.device_id.to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { - device_id: "bar".to_string(), + device_id: device_info.device_id.to_string(), payload, }; let grpc_message = tonic::Request::new(request); @@ -102,23 +81,7 @@ let ten_millis = time::Duration::from_millis(50); thread::sleep(ten_millis); - // Create session as a keyserver - let (mut socket, _) = connect_async("ws://localhost:51001") - .await - .expect("Can't connect"); - - let session_request = r#"{ - "type": "sessionRequest", - "accessToken": "xkdexfjsld", - "deviceID": "bar", - "deviceType": "keyserver" - }"#; - - socket - .send(Message::Text(session_request.to_string())) - .await - .expect("Failed to send message"); - + let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server