diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs index 556c37d7f..7914ed2a9 100644 --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -1,103 +1,111 @@ use comm_opaque2::client::Registration; use grpc_clients::identity::get_unauthenticated_client; use rand::{distributions::Alphanumeric, Rng}; use crate::identity::olm_account_infos::{ ClientPublicKeys, DEFAULT_CLIENT_KEYS, }; use crate::service_addr; use grpc_clients::identity::protos::unauth::{ DeviceKeyUpload, DeviceType, IdentityKeyInfo, Prekey, RegistrationFinishRequest, RegistrationStartRequest, }; pub const PLACEHOLDER_CODE_VERSION: u64 = 0; pub const DEVICE_TYPE: &str = "service"; pub struct DeviceInfo { pub username: String, pub user_id: String, pub device_id: String, pub access_token: String, } -pub async fn create_device(keys: Option<&ClientPublicKeys>) -> DeviceInfo { +/// Register a new user with a device. +/// - Gives random username (returned by function). +/// - Device type defaults to keyserver. +/// - Device ID taken from `keys` (ed25519), see [`DEFAULT_CLIENT_KEYS`] +pub async fn register_user_device( + keys: Option<&ClientPublicKeys>, + device_type: Option, +) -> DeviceInfo { let password = "pass"; let username: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); // TODO: Generate dynamic valid olm account info let keys = keys.unwrap_or_else(|| &DEFAULT_CLIENT_KEYS); let example_payload = serde_json::to_string(&keys).expect("Failed to serialize example payload"); // The ed25519 value from the olm payload let device_id = &keys.primary_identity_public_keys.ed25519; + let device_type = device_type.unwrap_or(DeviceType::Keyserver); let mut client_registration = Registration::new(); let opaque_registration_request = client_registration.start(password).unwrap(); let registration_start_request = RegistrationStartRequest { opaque_registration_request, username: username.to_string(), device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: example_payload.to_string(), payload_signature: "foo".to_string(), social_proof: None, }), content_upload: Some(Prekey { prekey: "content_prekey".to_string(), prekey_signature: "content_prekey_sig".to_string(), }), notif_upload: Some(Prekey { prekey: "notif_prekey".to_string(), prekey_signature: "notif_prekey_sig".to_string(), }), one_time_content_prekeys: Vec::new(), one_time_notif_prekeys: Vec::new(), - device_type: DeviceType::Keyserver.into(), + device_type: device_type.into(), }), }; let mut identity_client = get_unauthenticated_client( &service_addr::IDENTITY_GRPC.to_string(), PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let registration_start_response = identity_client .register_password_user_start(registration_start_request) .await .unwrap() .into_inner(); let opaque_registration_upload = client_registration .finish( password, ®istration_start_response.opaque_registration_response, ) .unwrap(); let registration_finish_request = RegistrationFinishRequest { session_id: registration_start_response.session_id, opaque_registration_upload, }; let registration_finish_response = identity_client .register_password_user_finish(registration_finish_request) .await .unwrap() .into_inner(); DeviceInfo { username: username.to_string(), device_id: device_id.to_string(), user_id: registration_finish_response.user_id, access_token: registration_finish_response.access_token, } } diff --git a/services/commtest/tests/grpc_client_test.rs b/services/commtest/tests/grpc_client_test.rs index 7d64c395a..ae91a89ee 100644 --- a/services/commtest/tests/grpc_client_test.rs +++ b/services/commtest/tests/grpc_client_test.rs @@ -1,36 +1,36 @@ -use commtest::{identity::device::create_device, service_addr}; +use commtest::{identity::device::register_user_device, service_addr}; #[tokio::test] async fn verify_access_token() { use grpc_clients::identity::unauthenticated::client::verify_user_access_token; - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let code_version = 1000; let device_type = "android"; let token_valid = verify_user_access_token( &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, &device_info.access_token, code_version, device_type.to_string(), ) .await .expect("Failed to call identity's verify_user_access_token endpoint"); assert!(token_valid); // Try again with invalid access token let token_valid = verify_user_access_token( &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, "garbage", code_version, device_type.to_string(), ) .await .expect("Failed to call identity's verify_user_access_token endpoint"); assert!(!token_valid); } diff --git a/services/commtest/tests/identity_access_tokens_tests.rs b/services/commtest/tests/identity_access_tokens_tests.rs index 5de0d30c1..5eae57d5c 100644 --- a/services/commtest/tests/identity_access_tokens_tests.rs +++ b/services/commtest/tests/identity_access_tokens_tests.rs @@ -1,34 +1,34 @@ use commtest::identity::device::{ - create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, + register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use grpc_clients::identity::{ get_unauthenticated_client, protos::unauth::VerifyUserAccessTokenRequest, }; #[tokio::test] async fn verify_access_token() { let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut identity_client = get_unauthenticated_client( &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let verify_request = VerifyUserAccessTokenRequest { user_id: device_info.user_id, device_id: device_info.device_id, access_token: device_info.access_token, }; let response = identity_client .verify_user_access_token(verify_request) .await .unwrap(); assert!(response.into_inner().token_valid); } diff --git a/services/commtest/tests/identity_integration_tests.rs b/services/commtest/tests/identity_integration_tests.rs index c43db4410..049ef8f71 100644 --- a/services/commtest/tests/identity_integration_tests.rs +++ b/services/commtest/tests/identity_integration_tests.rs @@ -1,40 +1,40 @@ use commtest::identity::device::{ - create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, + register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use grpc_clients::identity::protos::authenticated::find_user_id_request::Identifier; use grpc_clients::identity::{ get_auth_client, protos::authenticated::FindUserIdRequest, }; #[tokio::test] async fn find_user_id_by_username() { - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), device_info.user_id.clone(), device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let request = FindUserIdRequest { identifier: Some(Identifier::Username(device_info.username)), }; let response = client .find_user_id(request) .await .expect("Request failed") .into_inner(); assert_eq!( response.user_id, Some(device_info.user_id), "User ID should match" ); } diff --git a/services/commtest/tests/identity_keyserver_tests.rs b/services/commtest/tests/identity_keyserver_tests.rs index 885558d07..6afaed70e 100644 --- a/services/commtest/tests/identity_keyserver_tests.rs +++ b/services/commtest/tests/identity_keyserver_tests.rs @@ -1,72 +1,72 @@ use commtest::identity::device::{ - create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, + register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, protos::authenticated::{ OutboundKeysForUserRequest, UploadOneTimeKeysRequest, }, }; #[tokio::test] async fn set_prekey() { let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut client = get_auth_client( &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { content_one_time_prekeys: vec!["content1".to_string()], notif_one_time_prekeys: vec!["notif1".to_string()], }; client .upload_one_time_keys(upload_request) .await .expect("Failed to upload keys"); // Currently allowed to request your own outbound keys let keyserver_request = OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); let first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); assert_eq!( first_reponse.one_time_content_prekey, Some("content1".to_string()) ); assert_eq!( first_reponse.one_time_notif_prekey, Some("notif1".to_string()) ); let second_reponse = client .get_keyserver_keys(keyserver_request) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The one time keys should be exhausted assert_eq!(second_reponse.one_time_content_prekey, None); assert_eq!(second_reponse.one_time_notif_prekey, None); } diff --git a/services/commtest/tests/identity_one_time_key_tests.rs b/services/commtest/tests/identity_one_time_key_tests.rs index bb0c973eb..08d94f2e2 100644 --- a/services/commtest/tests/identity_one_time_key_tests.rs +++ b/services/commtest/tests/identity_one_time_key_tests.rs @@ -1,36 +1,36 @@ use commtest::identity::device::{ - create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, + register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, protos::authenticated::UploadOneTimeKeysRequest, }; #[tokio::test] async fn upload_one_time_keys() { - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut identity_client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), device_info.user_id, device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { content_one_time_prekeys: vec![ "content1".to_string(), "content2".to_string(), ], notif_one_time_prekeys: vec!["notif1".to_string(), "notif2".to_string()], }; identity_client .upload_one_time_keys(upload_request) .await .unwrap(); } diff --git a/services/commtest/tests/identity_prekey_tests.rs b/services/commtest/tests/identity_prekey_tests.rs index f99382778..5cc2dda7a 100644 --- a/services/commtest/tests/identity_prekey_tests.rs +++ b/services/commtest/tests/identity_prekey_tests.rs @@ -1,41 +1,41 @@ use commtest::identity::device::{ - create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, + register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, protos::{authenticated::RefreshUserPrekeysRequest, unauth::Prekey}, }; #[tokio::test] async fn set_prekey() { - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), device_info.user_id, device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = RefreshUserPrekeysRequest { new_content_prekeys: Some(Prekey { prekey: "content_prekey".to_string(), prekey_signature: "content_prekey_signature".to_string(), }), new_notif_prekeys: Some(Prekey { prekey: "content_prekey".to_string(), prekey_signature: "content_prekey_signature".to_string(), }), }; // This send will fail if the one-time keys weren't successfully added println!( "Error: {:?}", client.refresh_user_prekeys(upload_request).await ); } diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs index 6f78ddc07..346572580 100644 --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -1,88 +1,88 @@ use commtest::identity::device::{ - create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, + register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use commtest::tunnelbroker::socket::{create_socket, receive_message}; use futures_util::StreamExt; use grpc_clients::identity::get_auth_client; use grpc_clients::identity::protos::authenticated::{ OutboundKeysForUserRequest, UploadOneTimeKeysRequest, }; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn test_tunnelbroker_invalid_auth() { - let mut device_info = create_device(None).await; + let mut device_info = register_user_device(None, None).await; device_info.access_token = "".to_string(); let socket = create_socket(&device_info).await; assert!(matches!(socket, Result::Err(_))) } #[tokio::test] async fn test_tunnelbroker_valid_auth() { - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut socket = create_socket(&device_info).await.unwrap(); socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_refresh_keys_request_upon_depletion() { let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; // Request outbound keys, which should trigger identity service to ask for more keys let mut client = get_auth_client( &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { content_one_time_prekeys: vec!["content1".to_string()], notif_one_time_prekeys: vec!["notif1".to_string()], }; client.upload_one_time_keys(upload_request).await.unwrap(); let keyserver_request = OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); let _first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The current threshold is 5, but we only upload two. Should receive request // from Tunnelbroker to refresh keys // Create session as a keyserver - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut socket = create_socket(&device_info).await.unwrap(); for _ in 0..2 { let response = receive_message(&mut socket).await.unwrap(); let serialized_response: RefreshKeyRequest = serde_json::from_str(&response).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; assert_eq!(serialized_response, expected_response); } } diff --git a/services/commtest/tests/tunnelbroker_heartbeat_tests.rs b/services/commtest/tests/tunnelbroker_heartbeat_tests.rs index 3c9b6e628..eb6760814 100644 --- a/services/commtest/tests/tunnelbroker_heartbeat_tests.rs +++ b/services/commtest/tests/tunnelbroker_heartbeat_tests.rs @@ -1,85 +1,85 @@ -use commtest::identity::device::create_device; +use commtest::identity::device::register_user_device; use commtest::tunnelbroker::socket::create_socket; use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message::Close; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tunnelbroker_messages::Heartbeat; async fn receive_and_parse_message( socket: &mut WebSocketStream>, ) -> Heartbeat { if let Some(Ok(response)) = socket.next().await { let message = response .to_text() .expect("Unable to retrieve response content"); serde_json::from_str::(message) .expect("Unable to parse Heartbeat from response") } else { panic!("Received incorrect message type.") } } #[tokio::test] async fn test_receiving() { - let client = create_device(None).await; + let client = register_user_device(None, None).await; let mut socket = create_socket(&client).await.unwrap(); let message_to_device = receive_and_parse_message(&mut socket).await; assert_eq!(message_to_device, Heartbeat {}); socket .send(Close(None)) .await .expect("Failed to close socket"); } #[tokio::test] async fn test_responding() { - let client = create_device(None).await; + let client = register_user_device(None, None).await; let mut socket = create_socket(&client).await.unwrap(); let message_to_device = receive_and_parse_message(&mut socket).await; assert_eq!(message_to_device, Heartbeat {}); let heartbeat = Heartbeat {}; let serialized = serde_json::to_string(&heartbeat).unwrap(); socket .send(Message::Text(serialized)) .await .expect("Error while sending heartbeat"); // Receive and parse another heartbeat message let message_to_device = receive_and_parse_message(&mut socket).await; assert_eq!(message_to_device, Heartbeat {}); socket .send(Close(None)) .await .expect("Failed to close the socket"); } #[tokio::test] async fn test_closing() { - let client = create_device(None).await; + let client = register_user_device(None, None).await; let mut socket = create_socket(&client).await.unwrap(); let message_to_device = receive_and_parse_message(&mut socket).await; assert_eq!(message_to_device, Heartbeat {}); // The next message should be a Close message because we did not respond // to the Heartbeat. // This suggests that the Tunnelbroker might consider the connection // as unhealthy and decide to close it. if let Some(Ok(response)) = socket.next().await { assert_eq!(response, Message::Close(None)) } else { panic!("Received incorrect message type. Expected Close.") } } diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs index c688628fe..67f3b01b8 100644 --- a/services/commtest/tests/tunnelbroker_integration_tests.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -1,95 +1,95 @@ mod proto { tonic::include_proto!("tunnelbroker"); } -use commtest::identity::device::create_device; +use commtest::identity::device::register_user_device; use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; use commtest::service_addr; use commtest::tunnelbroker::socket::{ create_socket, receive_message, send_message, WebSocketMessageToDevice, }; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use std::time::Duration; use tokio::time::sleep; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn send_refresh_request() { // Create session as a keyserver - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; let mut socket = create_socket(&device_info).await.unwrap(); // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); let refresh_request = RefreshKeyRequest { device_id: device_info.device_id.clone(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.clone(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Have keyserver receive any websocket messages let response = receive_message(&mut socket).await.unwrap(); // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response).unwrap(); assert_eq!(serialized_response, refresh_request); } #[tokio::test] async fn test_messages_order() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; let messages = vec![ WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "first message".to_string(), }, WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "second message".to_string(), }, WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "third message".to_string(), }, ]; let mut sender_socket = create_socket(&sender).await.unwrap(); for msg in messages.clone() { send_message(&mut sender_socket, msg).await.unwrap(); } // Wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; let mut receiver_socket = create_socket(&receiver).await.unwrap(); for msg in messages { let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(msg.payload, response); } } diff --git a/services/commtest/tests/tunnelbroker_persist_tests.rs b/services/commtest/tests/tunnelbroker_persist_tests.rs index f01508178..faeec4c92 100644 --- a/services/commtest/tests/tunnelbroker_persist_tests.rs +++ b/services/commtest/tests/tunnelbroker_persist_tests.rs @@ -1,83 +1,83 @@ mod proto { tonic::include_proto!("tunnelbroker"); } -use commtest::identity::device::create_device; +use commtest::identity::device::register_user_device; use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; use commtest::service_addr; use commtest::tunnelbroker::socket::{ create_socket, receive_message, send_message, WebSocketMessageToDevice, }; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use std::time::Duration; use tokio::time::sleep; use tunnelbroker_messages::RefreshKeyRequest; /// Tests that a message to an offline device gets pushed to dynamodb /// then recalled once a device connects #[tokio::test] async fn persist_grpc_messages() { - let device_info = create_device(None).await; + let device_info = register_user_device(None, None).await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); let refresh_request = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.to_string(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; let mut socket = create_socket(&device_info).await.unwrap(); // Have keyserver receive any websocket messages let response = receive_message(&mut socket).await.unwrap(); // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response).unwrap(); assert_eq!(serialized_response, refresh_request); } #[tokio::test] async fn persist_websocket_messages() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; // Send message to not connected client let mut sender_socket = create_socket(&sender).await.unwrap(); let request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "persisted message".to_string(), }; send_message(&mut sender_socket, request.clone()) .await .unwrap(); // Wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; let mut receiver_socket = create_socket(&receiver).await.unwrap(); let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(request.payload, response); } diff --git a/services/commtest/tests/tunnelbroker_recipient_confirmation_tests.rs b/services/commtest/tests/tunnelbroker_recipient_confirmation_tests.rs index 6055588d1..46be4c008 100644 --- a/services/commtest/tests/tunnelbroker_recipient_confirmation_tests.rs +++ b/services/commtest/tests/tunnelbroker_recipient_confirmation_tests.rs @@ -1,197 +1,197 @@ -use commtest::identity::device::create_device; +use commtest::identity::device::register_user_device; use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; use commtest::tunnelbroker::socket::{ create_socket, receive_message, send_message, WebSocketMessageToDevice, }; use futures_util::{SinkExt, StreamExt}; use std::time::Duration; use tokio::time::sleep; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message::Close; use tunnelbroker_messages::MessageToDevice; #[tokio::test] async fn deliver_until_confirmation_not_connected() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; // send message to not connected client let mut sender_socket = create_socket(&sender).await.unwrap(); let request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "message from deliver_until_confirmation_not_connected" .to_string(), }; send_message(&mut sender_socket, request.clone()) .await .unwrap(); // wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; let mut receiver_socket = create_socket(&receiver).await.unwrap(); // receive message for the first time (without confirmation) let Some(Ok(response)) = receiver_socket.next().await else { panic!("Receiving first message failed") }; let message = response.to_text().unwrap(); let message_to_device = serde_json::from_str::(message).unwrap(); assert_eq!(request.payload, message_to_device.payload); // restart connection receiver_socket .send(Close(None)) .await .expect("Failed to send message"); receiver_socket = create_socket(&receiver).await.unwrap(); // receive message for the second time let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(request.payload, response); } #[tokio::test] async fn deliver_until_confirmation_connected() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; // send message to connected client let mut receiver_socket = create_socket(&receiver).await.unwrap(); let mut sender_socket = create_socket(&sender).await.unwrap(); let request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "message from deliver_until_confirmation_connected".to_string(), }; send_message(&mut sender_socket, request.clone()) .await .unwrap(); // receive message for the first time (without confirmation) let Some(Ok(response)) = receiver_socket.next().await else { panic!("Receiving first message failed") }; let message = response.to_text().unwrap(); let message_to_device = serde_json::from_str::(message).unwrap(); assert_eq!(request.payload, message_to_device.payload); // restart connection receiver_socket .send(Close(None)) .await .expect("Failed to send message"); receiver_socket = create_socket(&receiver).await.unwrap(); // receive message for the second time let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(request.payload, response); } #[tokio::test] async fn test_confirming_deleted_message() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; // send message to connected client let mut receiver_socket = create_socket(&receiver).await.unwrap(); let mut sender_socket = create_socket(&sender).await.unwrap(); let request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "message to bo confirmed twice".to_string(), }; send_message(&mut sender_socket, request.clone()) .await .unwrap(); // receive a message let Some(Ok(response)) = receiver_socket.next().await else { panic!("Receiving first message failed") }; let message = response.to_text().unwrap(); let message_to_device = serde_json::from_str::(message).unwrap(); assert_eq!(request.payload, message_to_device.payload); let confirmation = tunnelbroker_messages::MessageReceiveConfirmation { message_ids: vec![message_to_device.message_id], }; let serialized_confirmation = serde_json::to_string(&confirmation).unwrap(); // send confirmation twice receiver_socket .send(Message::Text(serialized_confirmation.clone())) .await .expect("Error while sending confirmation"); receiver_socket .send(Message::Text(serialized_confirmation)) .await .expect("Error while sending confirmation"); // test if socket is still alive by sending and receiving a message let second_request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "second request".to_string(), }; send_message(&mut sender_socket, second_request.clone()) .await .unwrap(); let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(second_request.payload, response); } #[tokio::test] async fn test_confirming() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; // send message to connected client let mut receiver_socket = create_socket(&receiver).await.unwrap(); let mut sender_socket = create_socket(&sender).await.unwrap(); // send first message let first_request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "first request".to_string(), }; send_message(&mut sender_socket, first_request.clone()) .await .unwrap(); // receive a first message let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(first_request.payload, response); // restart connection receiver_socket .send(Close(None)) .await .expect("Failed to send message"); tokio::time::sleep(Duration::from_millis(200)).await; receiver_socket = create_socket(&receiver).await.unwrap(); // send second message let second_request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "second request".to_string(), }; send_message(&mut sender_socket, second_request.clone()) .await .unwrap(); // make sure only second message is received let response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(second_request.payload, response) } diff --git a/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs b/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs index b83565e06..23f0f3ada 100644 --- a/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs +++ b/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs @@ -1,93 +1,93 @@ -use commtest::identity::device::create_device; +use commtest::identity::device::register_user_device; use commtest::identity::olm_account_infos::{ DEFAULT_CLIENT_KEYS, MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; use commtest::tunnelbroker::socket::{create_socket, receive_message}; use futures_util::{SinkExt, StreamExt}; use tokio_tungstenite::tungstenite::Message; use tunnelbroker_messages::{ MessageSentStatus, MessageToDeviceRequest, MessageToDeviceRequestStatus, }; /// Tests of responses sent from Tunnelberoker to client /// trying to send message to other device #[tokio::test] async fn get_confirmation() { - let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; - let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + let sender = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; + let receiver = register_user_device(Some(&MOCK_CLIENT_KEYS_2), None).await; let client_message_id = "mockID".to_string(); // Send message to not connected client let payload = "persisted message"; let request = MessageToDeviceRequest { client_message_id: client_message_id.clone(), device_id: receiver.device_id.clone(), payload: payload.to_string(), }; let serialized_request = serde_json::to_string(&request) .expect("Failed to serialize message to device"); let mut sender_socket = create_socket(&sender).await.unwrap(); sender_socket .send(Message::Text(serialized_request)) .await .expect("Failed to send message"); if let Some(Ok(response)) = sender_socket.next().await { let expected_response = MessageToDeviceRequestStatus { client_message_ids: vec![MessageSentStatus::Success(client_message_id)], }; let expected_payload = serde_json::to_string(&expected_response).unwrap(); let received_payload = response.to_text().unwrap(); assert_eq!(received_payload, expected_payload); }; // Connect receiver to flush DDB and avoid polluting other tests let mut receiver_socket = create_socket(&receiver).await.unwrap(); let receiver_response = receive_message(&mut receiver_socket).await.unwrap(); assert_eq!(payload, receiver_response); } #[tokio::test] async fn get_serialization_error() { - let sender = create_device(Some(&DEFAULT_CLIENT_KEYS)).await; + let sender = register_user_device(Some(&DEFAULT_CLIENT_KEYS), None).await; let message = "some bad json".to_string(); let mut sender_socket = create_socket(&sender).await.unwrap(); sender_socket .send(Message::Text(message.clone())) .await .expect("Failed to send message"); if let Some(Ok(response)) = sender_socket.next().await { let expected_response = MessageToDeviceRequestStatus { client_message_ids: vec![MessageSentStatus::SerializationError(message)], }; let expected_payload = serde_json::to_string(&expected_response).unwrap(); let received_payload = response.to_text().unwrap(); assert_eq!(received_payload, expected_payload); }; } #[tokio::test] async fn get_invalid_request_error() { - let sender = create_device(Some(&DEFAULT_CLIENT_KEYS)).await; + let sender = register_user_device(Some(&DEFAULT_CLIENT_KEYS), None).await; let mut sender_socket = create_socket(&sender).await.unwrap(); sender_socket .send(Message::Binary(vec![])) .await .expect("Failed to send message"); if let Some(Ok(response)) = sender_socket.next().await { let expected_response = MessageToDeviceRequestStatus { client_message_ids: vec![MessageSentStatus::InvalidRequest], }; let expected_payload = serde_json::to_string(&expected_response).unwrap(); let received_payload = response.to_text().unwrap(); assert_eq!(received_payload, expected_payload); }; } diff --git a/services/commtest/tests/tunnelbroker_websocket_messages_tests.rs b/services/commtest/tests/tunnelbroker_websocket_messages_tests.rs index e906accf9..38452c417 100644 --- a/services/commtest/tests/tunnelbroker_websocket_messages_tests.rs +++ b/services/commtest/tests/tunnelbroker_websocket_messages_tests.rs @@ -1,49 +1,49 @@ -use commtest::identity::device::create_device; +use commtest::identity::device::register_user_device; use commtest::identity::olm_account_infos::MOCK_CLIENT_KEYS_1; use commtest::tunnelbroker::socket::create_socket; use futures_util::{SinkExt, StreamExt}; use tokio_tungstenite::tungstenite::{Error, Message, Message::Close}; /// Tests for message types defined in tungstenite crate #[tokio::test] async fn test_ping_pong() { - let device = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; + let device = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; let ping_message = vec![1, 2, 3, 4, 5]; let mut socket = create_socket(&device).await.unwrap(); socket .send(Message::Ping(ping_message.clone())) .await .expect("Failed to send message"); if let Some(Ok(response)) = socket.next().await { let received_payload = match response { Message::Pong(received_payload) => received_payload, unexpected => panic!( "Unexpected message type or result. Expected Pong, got: {:?}. ", unexpected ), }; assert_eq!(ping_message.clone(), received_payload); }; } #[tokio::test] async fn test_close_message() { - let device = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; + let device = register_user_device(Some(&MOCK_CLIENT_KEYS_1), None).await; let mut socket = create_socket(&device).await.unwrap(); socket .send(Close(None)) .await .expect("Failed to send message"); if let Some(response) = socket.next().await { assert!(matches!( response, Err(Error::AlreadyClosed | Error::ConnectionClosed) | Ok(Close(None)) )); }; }