diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs index d82b5b07a..4fc747f99 100644 --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -1,103 +1,103 @@ use comm_opaque2::client::Registration; use grpc_clients::identity::get_unauthenticated_client; use rand::{distributions::Alphanumeric, Rng}; use crate::identity::olm_account_infos::{ ClientPublicKeys, DEFAULT_CLIENT_KEYS, }; +use crate::service_addr; use grpc_clients::identity::protos::client::{ DeviceKeyUpload, DeviceType, IdentityKeyInfo, PreKey, RegistrationFinishRequest, RegistrationStartRequest, }; pub const PLACEHOLDER_CODE_VERSION: u64 = 0; pub const DEVICE_TYPE: &str = "service"; pub struct DeviceInfo { pub username: String, pub user_id: String, pub device_id: String, pub access_token: String, } pub async fn create_device(keys: Option<&ClientPublicKeys>) -> DeviceInfo { let password = "pass"; let username: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); // TODO: Generate dynamic valid olm account info let keys = keys.unwrap_or_else(|| &DEFAULT_CLIENT_KEYS); let example_payload = serde_json::to_string(&keys).expect("Failed to serialize example payload"); // The ed25519 value from the olm payload let device_id = &keys.primary_identity_public_keys.ed25519; let mut client_registration = Registration::new(); let opaque_registration_request = client_registration.start(password).unwrap(); let registration_start_request = RegistrationStartRequest { opaque_registration_request, username: username.to_string(), device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: example_payload.to_string(), payload_signature: "foo".to_string(), social_proof: None, }), content_upload: Some(PreKey { pre_key: "content_prekey".to_string(), pre_key_signature: "content_prekey_sig".to_string(), }), notif_upload: Some(PreKey { pre_key: "notif_prekey".to_string(), pre_key_signature: "notif_prekey_sig".to_string(), }), one_time_content_prekeys: Vec::new(), one_time_notif_prekeys: Vec::new(), device_type: DeviceType::Keyserver.into(), }), }; - // TODO: allow endpoint to be configured let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let registration_start_response = identity_client .register_password_user_start(registration_start_request) .await .unwrap() .into_inner(); let opaque_registration_upload = client_registration .finish( password, ®istration_start_response.opaque_registration_response, ) .unwrap(); let registration_finish_request = RegistrationFinishRequest { session_id: registration_start_response.session_id, opaque_registration_upload, }; let registration_finish_response = identity_client .register_password_user_finish(registration_finish_request) .await .unwrap() .into_inner(); DeviceInfo { username: username.to_string(), device_id: device_id.to_string(), user_id: registration_finish_response.user_id, access_token: registration_finish_response.access_token, } } diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs index 938d68701..1b6efef9c 100644 --- a/services/commtest/src/lib.rs +++ b/services/commtest/src/lib.rs @@ -1,6 +1,7 @@ pub mod backup; pub mod blob; pub mod constants; pub mod identity; +pub mod service_addr; pub mod tools; pub mod tunnelbroker; diff --git a/services/commtest/src/service_addr.rs b/services/commtest/src/service_addr.rs new file mode 100644 index 000000000..9b45ca975 --- /dev/null +++ b/services/commtest/src/service_addr.rs @@ -0,0 +1,87 @@ +use std::env; +use std::fmt::Display; + +use tokio_tungstenite::tungstenite; + +pub static TUNNELBROKER_WS: &ServiceAddr = &ServiceAddr { + scheme: "ws", + endpoint_env_var: "TUNNELBROKER_WS_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_TUNNELBROKER_WS", + default_port: 51001, +}; +pub static TUNNELBROKER_GRPC: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "TUNNELBROKER_GRPC_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_TUNNELBROKER", + default_port: 50051, +}; +pub static BACKUP_SERVICE_HTTP: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "BACKUP_SERVICE_URL", + port_env_var: "COMM_SERVICES_PORT_BACKUP", + default_port: 50052, +}; +pub static BLOB_SERVICE_HTTP: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "BLOB_SERVICE_URL", + port_env_var: "COMM_SERVICES_PORT_BLOB", + default_port: 50053, +}; +pub static IDENTITY_GRPC: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "IDENTITY_GRPC_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_IDENTITY", + default_port: 50054, +}; + +#[derive(Debug, Clone, Copy)] +pub struct ServiceAddr { + scheme: &'static str, + default_port: u16, + /// Environment variable name for the endpoint + /// that overrides all other values + endpoint_env_var: &'static str, + /// Environment variable name for the port, overrides default_port + /// If `endpoint_env_var` is set, this is ignored. + /// Otherwise, addr is `scheme://localhost:${port_env_var}` + port_env_var: &'static str, +} + +impl Display for ServiceAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Ok(endpoint) = env::var(self.endpoint_env_var) { + return write!(f, "{}", endpoint); + } + + let port = env::var(self.port_env_var) + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(self.default_port); + + write!(f, "{}://localhost:{}", self.scheme, port) + } +} + +impl TryFrom<&ServiceAddr> for reqwest::Url { + type Error = url::ParseError; + + fn try_from(service_addr: &ServiceAddr) -> Result { + reqwest::Url::parse(&service_addr.to_string()) + } +} + +impl TryFrom<&ServiceAddr> for tonic::transport::Endpoint { + type Error = tonic::transport::Error; + + fn try_from(value: &ServiceAddr) -> Result { + value.to_string().try_into() + } +} + +impl tungstenite::client::IntoClientRequest for &ServiceAddr { + fn into_client_request( + self, + ) -> tungstenite::Result { + self.to_string().into_client_request() + } +} diff --git a/services/commtest/src/tunnelbroker/socket.rs b/services/commtest/src/tunnelbroker/socket.rs index b8151cc56..8614aa964 100644 --- a/services/commtest/src/tunnelbroker/socket.rs +++ b/services/commtest/src/tunnelbroker/socket.rs @@ -1,65 +1,66 @@ use crate::identity::device::DeviceInfo; +use crate::service_addr; use futures_util::{SinkExt, StreamExt}; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use tunnelbroker_messages::{ ConnectionInitializationMessage, DeviceTypes, MessageSentStatus, MessageToDevice, MessageToDeviceRequest, MessageToDeviceRequestStatus, }; pub async fn create_socket( device_info: &DeviceInfo, ) -> WebSocketStream> { - let (mut socket, _) = connect_async("ws://localhost:51001") + let (mut socket, _) = connect_async(service_addr::TUNNELBROKER_WS) .await .expect("Can't connect"); let session_request = ConnectionInitializationMessage { device_id: device_info.device_id.to_string(), access_token: device_info.access_token.to_string(), user_id: device_info.user_id.to_string(), notify_token: None, device_type: DeviceTypes::Keyserver, device_app_version: None, device_os: None, }; let serialized_request = serde_json::to_string(&session_request) .expect("Failed to serialize connection request"); socket .send(Message::Text(serialized_request)) .await .expect("Failed to send message"); socket } pub async fn send_message( socket: &mut WebSocketStream>, message: MessageToDevice, ) -> Result> { let client_message_id = uuid::Uuid::new_v4().to_string(); let request = MessageToDeviceRequest { client_message_id: client_message_id.clone(), device_id: message.device_id, payload: message.payload, }; let serialized_request = serde_json::to_string(&request)?; socket.send(Message::Text(serialized_request)).await?; if let Some(Ok(response)) = socket.next().await { let confirmation: MessageToDeviceRequestStatus = serde_json::from_str(response.to_text().unwrap())?; if confirmation .client_message_ids .contains(&MessageSentStatus::Success(client_message_id.clone())) { return Ok(client_message_id); } } Err("Failed to confirm sent message".into()) } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index ced5ad3c7..3f9d07eb5 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,128 +1,123 @@ use bytesize::ByteSize; use comm_services_lib::{auth::UserIdentity, backup::LatestBackupIDResponse}; use commtest::{ backup::{ backup_utils::BackupData, create_new_backup, pull_backup::{self, BackupDescriptor, RequestedData}, }, + service_addr, tools::{generate_stable_nbytes, Error}, }; use reqwest::StatusCode; -use std::env; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BACKUP_SERVICE_HTTP) + .expect("failed to parse backup service url"); let backup_datas = [ BackupData { backup_id: "b1".to_string(), user_keys_hash: "kh1".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'a'), ), user_data_hash: "dh1".to_string(), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'A'), ), attachments: vec![], }, BackupData { backup_id: "b2".to_string(), user_keys_hash: "kh2".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'b'), ), user_data_hash: "dh2".to_string(), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'B'), ), attachments: vec![], }, ]; let user_identity = UserIdentity { user_id: "1".to_string(), access_token: "dummy access token".to_string(), device_id: "dummy device_id".to_string(), }; create_new_backup::run(url.clone(), &user_identity, &backup_datas[0]).await?; create_new_backup::run(url.clone(), &user_identity, &backup_datas[1]).await?; // Test direct lookup let second_backup_descriptor = BackupDescriptor::BackupID { backup_id: backup_datas[1].backup_id.clone(), user_identity: user_identity.clone(), }; let user_keys = pull_backup::run( url.clone(), second_backup_descriptor.clone(), RequestedData::UserKeys, ) .await?; assert_eq!(user_keys, backup_datas[1].user_keys); let user_data = pull_backup::run( url.clone(), second_backup_descriptor.clone(), RequestedData::UserData, ) .await?; assert_eq!(user_data, backup_datas[1].user_data); // Test latest backup lookup let latest_backup_descriptor = BackupDescriptor::Latest { // Initial version of the backup service uses `user_id` in place of a username username: "1".to_string(), }; let backup_id_response = pull_backup::run( url.clone(), latest_backup_descriptor.clone(), RequestedData::BackupID, ) .await?; let response: LatestBackupIDResponse = serde_json::from_slice(&backup_id_response)?; assert_eq!(response.backup_id, backup_datas[1].backup_id); let user_keys = pull_backup::run( url.clone(), latest_backup_descriptor.clone(), RequestedData::UserKeys, ) .await?; assert_eq!(user_keys, backup_datas[1].user_keys); // Test cleanup let first_backup_descriptor = BackupDescriptor::BackupID { backup_id: backup_datas[0].backup_id.clone(), user_identity: user_identity.clone(), }; let response = pull_backup::run( url.clone(), first_backup_descriptor.clone(), RequestedData::UserKeys, ) .await; assert!( matches!(response, Err(Error::HttpStatus(StatusCode::NOT_FOUND))), "First backup should have been removed, instead got response: {response:?}" ); Ok(()) } diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs index 1be2037d8..10572c21b 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,190 +1,185 @@ use bytesize::ByteSize; use comm_services_lib::{auth::UserIdentity, backup::LatestBackupIDResponse}; use commtest::{ backup::{ backup_utils::BackupData, create_new_backup, pull_backup::{self, BackupDescriptor}, }, + service_addr, tools::{generate_stable_nbytes, obtain_number_of_threads, Error}, }; -use std::env; use tokio::{runtime::Runtime, task::JoinSet}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BACKUP_SERVICE_HTTP) + .expect("failed to parse backup service url"); let number_of_threads = obtain_number_of_threads(); let rt = Runtime::new().unwrap(); println!( "Running performance tests for backup, number of threads: {}", number_of_threads ); let mut backup_data = vec![]; for i in 0..number_of_threads { backup_data.push(BackupData { backup_id: format!("b{i}"), user_keys_hash: format!("kh{i}"), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(i as u8), ), user_data_hash: format!("dh{i}"), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(i as u8), ), attachments: vec![], }); } let user_identities = [ UserIdentity { user_id: "1".to_string(), access_token: "dummy access token".to_string(), device_id: "dummy device_id".to_string(), }, UserIdentity { user_id: "2".to_string(), access_token: "dummy access token".to_string(), device_id: "dummy device_id".to_string(), }, ]; tokio::task::spawn_blocking(move || { println!("Creating new backups"); rt.block_on(async { let mut set = JoinSet::new(); for (i, item) in backup_data.iter().enumerate() { let url = url.clone(); let user = user_identities[i % user_identities.len()].clone(); let item = item.clone(); set.spawn(async move { create_new_backup::run(url, &user, &item).await.unwrap(); }); } while let Some(result) = set.join_next().await { result.unwrap(); } }); let mut latest_ids_for_user = vec![]; println!("Reading latest ids"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let url = url.clone(); let descriptor = BackupDescriptor::Latest { username: user.user_id.clone(), }; handlers.push(tokio::spawn(async move { let response = pull_backup::run( url, descriptor, pull_backup::RequestedData::BackupID, ) .await .unwrap(); serde_json::from_slice::(&response).unwrap() })); } for handler in handlers { latest_ids_for_user.push(handler.await.unwrap().backup_id); } }); assert_eq!(latest_ids_for_user.len(), user_identities.len()); let mut latest_user_keys_for_user = vec![]; println!("Reading latest user keys"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let url = url.clone(); let descriptor = BackupDescriptor::Latest { username: user.user_id.clone(), }; handlers.push(tokio::spawn(async move { pull_backup::run( url, descriptor, pull_backup::RequestedData::UserKeys, ) .await .unwrap() })); } for handler in handlers { latest_user_keys_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_keys_for_user.len(), user_identities.len()); for (backup_id, user_keys) in latest_ids_for_user.iter().zip(latest_user_keys_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_keys, user_keys); } let mut latest_user_data_for_user = vec![]; println!("Reading latest user data"); rt.block_on(async { let mut handlers = vec![]; for (i, backup_id) in latest_ids_for_user.iter().enumerate() { let url = url.clone(); let descriptor = BackupDescriptor::BackupID { backup_id: backup_id.clone(), user_identity: user_identities[i % user_identities.len()].clone(), }; handlers.push(tokio::spawn(async move { pull_backup::run( url, descriptor, pull_backup::RequestedData::UserData, ) .await .unwrap() })); } for handler in handlers { latest_user_data_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_data_for_user.len(), user_identities.len()); for (backup_id, user_data) in latest_ids_for_user.iter().zip(latest_user_data_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_data, user_data); } }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs index 0027fec5d..f3f9b82e0 100644 --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,75 +1,73 @@ use bytesize::ByteSize; -use commtest::blob::{ - blob_utils::{BlobData, BlobServiceClient}, - get, put, remove, -}; use commtest::constants; use commtest::tools::Error; -use std::env; +use commtest::{ + blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, + }, + service_addr, +}; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let blob_data = vec![ BlobData { holder: "test_holder001".to_string(), hash: "test_hash001".to_string(), chunks_sizes: vec![ ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ], }, BlobData { holder: "test_holder002".to_string(), hash: "test_hash002".to_string(), chunks_sizes: vec![ *constants::GRPC_CHUNK_SIZE_LIMIT, *constants::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(10).as_u64() as usize, ], }, BlobData { holder: "test_holder003".to_string(), hash: "test_hash003".to_string(), chunks_sizes: vec![ *constants::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(100).as_u64() as usize, *constants::GRPC_CHUNK_SIZE_LIMIT, ], }, ]; for item in &blob_data { - let data_exists: bool = put::run(&client, &item).await?; + let data_exists: bool = put::run(&client, item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { - let received_sizes = get::run(&client, &blob_item).await?; + let received_sizes = get::run(&client, blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); } for item in &blob_data { - remove::run(&client, &item).await?; + remove::run(&client, item).await?; assert!( - get::run(&client, &item).await.is_err(), + get::run(&client, item).await.is_err(), "item should no longer be available" ); } Ok(()) } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs index 55a6581dc..b27f1b9af 100644 --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,116 +1,114 @@ use bytesize::ByteSize; -use commtest::blob::{ - blob_utils::{BlobData, BlobServiceClient}, - get, put, remove, -}; use commtest::tools::{obtain_number_of_threads, Error}; -use std::env; +use commtest::{ + blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, + }, + service_addr, +}; use tokio::runtime::Runtime; #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for blob, number of threads: {}", number_of_threads ); let mut blob_data = vec![]; for i in 0..number_of_threads { let index: u64 = (i as u64) % 10; blob_data.push(BlobData { holder: format!("test_holder_{}", i), hash: format!("test_hash_{}", i), chunks_sizes: vec![ ByteSize::kib(200 + (300 - index * 20)).as_u64() as usize, ByteSize::kib(500 + (400 - index * 20)).as_u64() as usize, ByteSize::kib(700 + (500 - index * 25)).as_u64() as usize, ], }) } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // PUT rt.block_on(async { println!("performing PUT operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let data_exists: bool = put::run(&client_cloned, &item_cloned).await.unwrap(); assert!(!data_exists, "test data should not exist"); })); } for handler in handlers { handler.await.unwrap(); } }); // GET rt.block_on(async { println!("performing GET operations"); let mut handlers = vec![]; for (i, item) in blob_data.iter().enumerate() { let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let received_sizes = get::run(&client_cloned, &item_cloned).await.unwrap(); let expected_data_size = item_cloned.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); })); } for handler in handlers { handler.await.unwrap(); } }); // REMOVE rt.block_on(async { println!("performing REMOVE operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let client_cloned = client.clone(); handlers.push(tokio::spawn(async move { remove::run(&client_cloned, &item_cloned).await.unwrap(); assert!( get::run(&client_cloned, &item_cloned).await.is_err(), "item should no longer be available" ); })); } for handler in handlers { handler.await.unwrap(); } }); }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/grpc_client_test.rs b/services/commtest/tests/grpc_client_test.rs index bb6ed0b12..5ba025816 100644 --- a/services/commtest/tests/grpc_client_test.rs +++ b/services/commtest/tests/grpc_client_test.rs @@ -1,36 +1,36 @@ -use commtest::identity::device::create_device; +use commtest::{identity::device::create_device, service_addr}; #[tokio::test] async fn verify_access_token() { use grpc_clients::identity::unauthenticated::client::verify_user_access_token; let device_info = create_device(None).await; let code_version = 100; let device_type = "android"; let token_valid = verify_user_access_token( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, &device_info.access_token, code_version, device_type.to_string(), ) .await .expect("Failed to call identity's verify_user_access_token endpoint"); - assert_eq!(token_valid, true); + assert!(token_valid); // Try again with invalid access token let token_valid = verify_user_access_token( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, "garbage", code_version, device_type.to_string(), ) .await .expect("Failed to call identity's verify_user_access_token endpoint"); - assert_eq!(token_valid, false); + assert!(!token_valid); } diff --git a/services/commtest/tests/identity_access_tokens_tests.rs b/services/commtest/tests/identity_access_tokens_tests.rs index a9c915f77..ade6e2883 100644 --- a/services/commtest/tests/identity_access_tokens_tests.rs +++ b/services/commtest/tests/identity_access_tokens_tests.rs @@ -1,60 +1,63 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_unauthenticated_client, protos::client::{UploadOneTimeKeysRequest, VerifyUserAccessTokenRequest}, }; #[tokio::test] async fn verify_access_token() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let verify_request = VerifyUserAccessTokenRequest { user_id: device_info.user_id, signing_public_key: device_info.device_id, access_token: device_info.access_token, }; let response = identity_client .verify_user_access_token(verify_request) .await .unwrap(); - assert_eq!(response.into_inner().token_valid, true); + assert!(response.into_inner().token_valid); } #[tokio::test] async fn upload_one_time_keys() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { user_id: device_info.user_id, device_id: device_info.device_id, access_token: device_info.access_token, content_one_time_pre_keys: vec!["a".to_string(), "b".to_string()], notif_one_time_pre_keys: vec!["c".to_string(), "d".to_string()], }; // This send will fail if the one-time keys weren't successfully added identity_client .upload_one_time_keys(upload_request) .await .unwrap(); } diff --git a/services/commtest/tests/identity_keyserver_tests.rs b/services/commtest/tests/identity_keyserver_tests.rs index d645a0a69..d9966ae1f 100644 --- a/services/commtest/tests/identity_keyserver_tests.rs +++ b/services/commtest/tests/identity_keyserver_tests.rs @@ -1,81 +1,83 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, get_unauthenticated_client, protos::{ authenticated::OutboundKeysForUserRequest, client::UploadOneTimeKeysRequest, }, }; #[tokio::test] async fn set_prekey() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut client = get_auth_client( - "http://[::1]:50054", + &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id.clone(), device_info.access_token.clone(), PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { user_id: device_info.user_id.clone(), device_id: device_info.device_id, access_token: device_info.access_token, content_one_time_pre_keys: vec!["content1".to_string()], notif_one_time_pre_keys: vec!["notif1".to_string()], }; let mut unauthenticated_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); unauthenticated_client .upload_one_time_keys(upload_request) .await .expect("Failed to upload keys"); // Currently allowed to request your own outbound keys let keyserver_request = OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); let first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); assert_eq!( first_reponse.one_time_content_prekey, Some("content1".to_string()) ); assert_eq!( first_reponse.one_time_notif_prekey, Some("notif1".to_string()) ); let second_reponse = client .get_keyserver_keys(keyserver_request) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The one time keys should be exhausted assert_eq!(second_reponse.one_time_content_prekey, None); assert_eq!(second_reponse.one_time_notif_prekey, None); } diff --git a/services/commtest/tests/identity_one_time_key_tests.rs b/services/commtest/tests/identity_one_time_key_tests.rs index 6ea7bae1e..f9617ffb4 100644 --- a/services/commtest/tests/identity_one_time_key_tests.rs +++ b/services/commtest/tests/identity_one_time_key_tests.rs @@ -1,35 +1,36 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_unauthenticated_client, protos::client::UploadOneTimeKeysRequest, }; #[tokio::test] async fn verify_access_token() { let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { user_id: device_info.user_id, device_id: device_info.device_id, access_token: device_info.access_token, content_one_time_pre_keys: vec![ "content1".to_string(), "content2".to_string(), ], notif_one_time_pre_keys: vec!["notif1".to_string(), "notif2".to_string()], }; identity_client .upload_one_time_keys(upload_request) .await .unwrap(); } diff --git a/services/commtest/tests/identity_prekey_tests.rs b/services/commtest/tests/identity_prekey_tests.rs index 95c9f5a02..d04283051 100644 --- a/services/commtest/tests/identity_prekey_tests.rs +++ b/services/commtest/tests/identity_prekey_tests.rs @@ -1,40 +1,41 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, protos::{authenticated::RefreshUserPreKeysRequest, client::PreKey}, }; #[tokio::test] async fn set_prekey() { let device_info = create_device(None).await; let mut client = get_auth_client( - "http://[::1]:50054", + &service_addr::IDENTITY_GRPC.to_string(), device_info.user_id, device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = RefreshUserPreKeysRequest { new_content_pre_keys: Some(PreKey { pre_key: "content_prekey".to_string(), pre_key_signature: "content_prekey_signature".to_string(), }), new_notif_pre_keys: Some(PreKey { pre_key: "content_prekey".to_string(), pre_key_signature: "content_prekey_signature".to_string(), }), }; // This send will fail if the one-time keys weren't successfully added println!( "Error: {:?}", client.refresh_user_pre_keys(upload_request).await ); } diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs index 839af3288..e316e5cf2 100644 --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -1,107 +1,109 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use grpc_clients::identity::protos::authenticated::OutboundKeysForUserRequest; use grpc_clients::identity::protos::client::UploadOneTimeKeysRequest; use grpc_clients::identity::{get_auth_client, get_unauthenticated_client}; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] #[should_panic] async fn test_tunnelbroker_invalid_auth() { let mut device_info = create_device(None).await; device_info.access_token = "".to_string(); let mut socket = create_socket(&device_info).await; socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_tunnelbroker_valid_auth() { let device_info = create_device(None).await; let mut socket = create_socket(&device_info).await; socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_refresh_keys_request_upon_depletion() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { user_id: device_info.user_id.clone(), device_id: device_info.device_id.clone(), access_token: device_info.access_token.clone(), content_one_time_pre_keys: vec!["content1".to_string()], notif_one_time_pre_keys: vec!["notif1".to_string()], }; identity_client .upload_one_time_keys(upload_request) .await .unwrap(); // Request outbound keys, which should trigger identity service to ask for more keys let mut client = get_auth_client( - "http://[::1]:50054", + &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let keyserver_request = OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); let _first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The current threshold is 5, but we only upload two. Should receive request // from Tunnelbroker to refresh keys // Create session as a keyserver let device_info = create_device(None).await; let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; assert_eq!(serialized_response, expected_response); }; } diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs index 8c41d02c5..e24942177 100644 --- a/services/commtest/tests/tunnelbroker_integration_tests.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -1,98 +1,99 @@ mod proto { tonic::include_proto!("tunnelbroker"); } use commtest::identity::device::create_device; use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::{create_socket, send_message}; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use std::time::Duration; use tokio::time::sleep; use tunnelbroker_messages::{ MessageToDevice as WebSocketMessageToDevice, RefreshKeyRequest, }; #[tokio::test] async fn send_refresh_request() { // Create session as a keyserver let device_info = create_device(None).await; let mut socket = create_socket(&device_info).await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = - TunnelbrokerServiceClient::connect("http://localhost:50051") + TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); let refresh_request = RefreshKeyRequest { device_id: device_info.device_id.clone(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.clone(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Have keyserver receive any websocket messages let response = socket.next().await.unwrap().unwrap(); // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } #[tokio::test] async fn test_messages_order() { let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; let messages = vec![ WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "first message".to_string(), }, WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "second message".to_string(), }, WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "third message".to_string(), }, ]; let mut sender_socket = create_socket(&sender).await; for msg in messages.clone() { send_message(&mut sender_socket, msg).await.unwrap(); } // Wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; let mut receiver_socket = create_socket(&receiver).await; for msg in messages { if let Some(Ok(response)) = receiver_socket.next().await { let received_payload = response.to_text().unwrap(); assert_eq!(msg.payload, received_payload); } else { panic!("Unable to receive message"); } } } diff --git a/services/commtest/tests/tunnelbroker_persist_tests.rs b/services/commtest/tests/tunnelbroker_persist_tests.rs index 5e610b3a3..b3ace3760 100644 --- a/services/commtest/tests/tunnelbroker_persist_tests.rs +++ b/services/commtest/tests/tunnelbroker_persist_tests.rs @@ -1,88 +1,89 @@ mod proto { tonic::include_proto!("tunnelbroker"); } use commtest::identity::device::create_device; use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::{create_socket, send_message}; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; use std::time::Duration; use tokio::time::sleep; use tunnelbroker_messages::{ MessageToDevice as WebSocketMessageToDevice, RefreshKeyRequest, }; /// Tests that a message to an offline device gets pushed to dynamodb /// then recalled once a device connects #[tokio::test] async fn persist_grpc_messages() { let device_info = create_device(None).await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = - TunnelbrokerServiceClient::connect("http://localhost:50051") + TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); let refresh_request = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.to_string(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); }; } #[tokio::test] async fn persist_websocket_messages() { let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; // Send message to not connected client let mut sender_socket = create_socket(&sender).await; let request = WebSocketMessageToDevice { device_id: receiver.device_id.clone(), payload: "persisted message".to_string(), }; send_message(&mut sender_socket, request.clone()) .await .unwrap(); // Wait a specified duration to ensure that message had time to persist sleep(Duration::from_millis(100)).await; // Connect receiver let mut receiver_socket = create_socket(&receiver).await; // Receive message if let Some(Ok(response)) = receiver_socket.next().await { let received_payload = response.to_text().unwrap(); assert_eq!(request.payload, received_payload); }; }