diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -6,6 +6,7 @@ ClientPublicKeys, DEFAULT_CLIENT_KEYS, }; +use crate::service_addr; use grpc_clients::identity::protos::client::{ DeviceKeyUpload, DeviceType, IdentityKeyInfo, PreKey, RegistrationFinishRequest, RegistrationStartRequest, @@ -62,9 +63,8 @@ }), }; - // TODO: allow endpoint to be configured let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs --- a/services/commtest/src/lib.rs +++ b/services/commtest/src/lib.rs @@ -2,5 +2,6 @@ pub mod blob; pub mod constants; pub mod identity; +pub mod service_addr; pub mod tools; pub mod tunnelbroker; diff --git a/services/commtest/src/service_addr.rs b/services/commtest/src/service_addr.rs new file mode 100644 --- /dev/null +++ b/services/commtest/src/service_addr.rs @@ -0,0 +1,87 @@ +use std::env; +use std::fmt::Display; + +use tokio_tungstenite::tungstenite; + +pub static TUNNELBROKER_WS: &ServiceAddr = &ServiceAddr { + scheme: "ws", + endpoint_env_var: "TUNNELBROKER_WS_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_TUNNELBROKER_WS", + default_port: 51001, +}; +pub static TUNNELBROKER_GRPC: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "TUNNELBROKER_GRPC_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_TUNNELBROKER", + default_port: 50051, +}; +pub static BACKUP_SERVICE_HTTP: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "BACKUP_SERVICE_URL", + port_env_var: "COMM_SERVICES_PORT_BACKUP", + default_port: 50052, +}; +pub static BLOB_SERVICE_HTTP: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "BLOB_SERVICE_URL", + port_env_var: "COMM_SERVICES_PORT_BLOB", + default_port: 50053, +}; +pub static IDENTITY_GRPC: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "IDENTITY_GRPC_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_IDENTITY", + default_port: 50054, +}; + +#[derive(Debug, Clone, Copy)] +pub struct ServiceAddr { + scheme: &'static str, + default_port: u16, + /// Environment variable name for the endpoint + /// that overrides all other values + endpoint_env_var: &'static str, + /// Environment variable name for the port, overrides default_port + /// If `endpoint_env_var` is set, this is ignored. + /// Otherwise, addr is `scheme://localhost:${port_env_var}` + port_env_var: &'static str, +} + +impl Display for ServiceAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Ok(endpoint) = env::var(self.endpoint_env_var) { + return write!(f, "{}", endpoint); + } + + let port = env::var(self.port_env_var) + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(self.default_port); + + write!(f, "{}://localhost:{}", self.scheme, port) + } +} + +impl TryFrom<&ServiceAddr> for reqwest::Url { + type Error = url::ParseError; + + fn try_from(service_addr: &ServiceAddr) -> Result { + reqwest::Url::parse(&service_addr.to_string()) + } +} + +impl TryFrom<&ServiceAddr> for tonic::transport::Endpoint { + type Error = tonic::transport::Error; + + fn try_from(value: &ServiceAddr) -> Result { + value.to_string().try_into() + } +} + +impl tungstenite::client::IntoClientRequest for &ServiceAddr { + fn into_client_request( + self, + ) -> tungstenite::Result { + self.to_string().into_client_request() + } +} diff --git a/services/commtest/src/tunnelbroker/socket.rs b/services/commtest/src/tunnelbroker/socket.rs --- a/services/commtest/src/tunnelbroker/socket.rs +++ b/services/commtest/src/tunnelbroker/socket.rs @@ -1,4 +1,5 @@ use crate::identity::device::DeviceInfo; +use crate::service_addr; use futures_util::{SinkExt, StreamExt}; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::Message; @@ -11,7 +12,7 @@ pub async fn create_socket( device_info: &DeviceInfo, ) -> WebSocketStream> { - let (mut socket, _) = connect_async("ws://localhost:51001") + let (mut socket, _) = connect_async(service_addr::TUNNELBROKER_WS) .await .expect("Can't connect"); diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -6,20 +6,15 @@ create_new_backup, pull_backup::{self, BackupDescriptor, RequestedData}, }, + service_addr, tools::{generate_stable_nbytes, Error}, }; use reqwest::StatusCode; -use std::env; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BACKUP_SERVICE_HTTP) + .expect("failed to parse backup service url"); let backup_datas = [ BackupData { diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -6,20 +6,15 @@ create_new_backup, pull_backup::{self, BackupDescriptor}, }, + service_addr, tools::{generate_stable_nbytes, obtain_number_of_threads, Error}, }; -use std::env; use tokio::{runtime::Runtime, task::JoinSet}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BACKUP_SERVICE_HTTP) + .expect("failed to parse backup service url"); let number_of_threads = obtain_number_of_threads(); diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,20 +1,18 @@ use bytesize::ByteSize; -use commtest::blob::{ - blob_utils::{BlobData, BlobServiceClient}, - get, put, remove, -}; use commtest::constants; use commtest::tools::Error; -use std::env; +use commtest::{ + blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, + }, + service_addr, +}; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let blob_data = vec![ @@ -48,12 +46,12 @@ ]; for item in &blob_data { - let data_exists: bool = put::run(&client, &item).await?; + let data_exists: bool = put::run(&client, item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { - let received_sizes = get::run(&client, &blob_item).await?; + let received_sizes = get::run(&client, blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( @@ -64,9 +62,9 @@ } for item in &blob_data { - remove::run(&client, &item).await?; + remove::run(&client, item).await?; assert!( - get::run(&client, &item).await.is_err(), + get::run(&client, item).await.is_err(), "item should no longer be available" ); } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,20 +1,18 @@ use bytesize::ByteSize; -use commtest::blob::{ - blob_utils::{BlobData, BlobServiceClient}, - get, put, remove, -}; use commtest::tools::{obtain_number_of_threads, Error}; -use std::env; +use commtest::{ + blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, + }, + service_addr, +}; use tokio::runtime::Runtime; #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let number_of_threads = obtain_number_of_threads(); diff --git a/services/commtest/tests/grpc_client_test.rs b/services/commtest/tests/grpc_client_test.rs --- a/services/commtest/tests/grpc_client_test.rs +++ b/services/commtest/tests/grpc_client_test.rs @@ -1,4 +1,4 @@ -use commtest::identity::device::create_device; +use commtest::{identity::device::create_device, service_addr}; #[tokio::test] async fn verify_access_token() { @@ -8,7 +8,7 @@ let device_type = "android"; let token_valid = verify_user_access_token( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, &device_info.access_token, @@ -18,11 +18,11 @@ .await .expect("Failed to call identity's verify_user_access_token endpoint"); - assert_eq!(token_valid, true); + assert!(token_valid); // Try again with invalid access token let token_valid = verify_user_access_token( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, "garbage", @@ -32,5 +32,5 @@ .await .expect("Failed to call identity's verify_user_access_token endpoint"); - assert_eq!(token_valid, false); + assert!(!token_valid); } diff --git a/services/commtest/tests/identity_access_tokens_tests.rs b/services/commtest/tests/identity_access_tokens_tests.rs --- a/services/commtest/tests/identity_access_tokens_tests.rs +++ b/services/commtest/tests/identity_access_tokens_tests.rs @@ -1,6 +1,7 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_unauthenticated_client, protos::client::{UploadOneTimeKeysRequest, VerifyUserAccessTokenRequest}, @@ -8,10 +9,11 @@ #[tokio::test] async fn verify_access_token() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) @@ -29,15 +31,16 @@ .await .unwrap(); - assert_eq!(response.into_inner().token_valid, true); + assert!(response.into_inner().token_valid); } #[tokio::test] async fn upload_one_time_keys() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) diff --git a/services/commtest/tests/identity_keyserver_tests.rs b/services/commtest/tests/identity_keyserver_tests.rs --- a/services/commtest/tests/identity_keyserver_tests.rs +++ b/services/commtest/tests/identity_keyserver_tests.rs @@ -1,6 +1,7 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, get_unauthenticated_client, protos::{ @@ -10,10 +11,11 @@ #[tokio::test] async fn set_prekey() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut client = get_auth_client( - "http://[::1]:50054", + &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id.clone(), device_info.access_token.clone(), @@ -32,7 +34,7 @@ }; let mut unauthenticated_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) diff --git a/services/commtest/tests/identity_one_time_key_tests.rs b/services/commtest/tests/identity_one_time_key_tests.rs --- a/services/commtest/tests/identity_one_time_key_tests.rs +++ b/services/commtest/tests/identity_one_time_key_tests.rs @@ -1,6 +1,7 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_unauthenticated_client, protos::client::UploadOneTimeKeysRequest, }; @@ -10,7 +11,7 @@ let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) diff --git a/services/commtest/tests/identity_prekey_tests.rs b/services/commtest/tests/identity_prekey_tests.rs --- a/services/commtest/tests/identity_prekey_tests.rs +++ b/services/commtest/tests/identity_prekey_tests.rs @@ -1,6 +1,7 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use grpc_clients::identity::{ get_auth_client, protos::{authenticated::RefreshUserPreKeysRequest, client::PreKey}, @@ -11,7 +12,7 @@ let device_info = create_device(None).await; let mut client = get_auth_client( - "http://[::1]:50054", + &service_addr::IDENTITY_GRPC.to_string(), device_info.user_id, device_info.device_id, device_info.access_token, diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -1,6 +1,7 @@ use commtest::identity::device::{ create_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use grpc_clients::identity::protos::authenticated::OutboundKeysForUserRequest; @@ -36,10 +37,11 @@ #[tokio::test] async fn test_refresh_keys_request_upon_depletion() { + let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = create_device(None).await; let mut identity_client = get_unauthenticated_client( - "http://127.0.0.1:50054", + &identity_grpc_endpoint, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) @@ -61,7 +63,7 @@ // Request outbound keys, which should trigger identity service to ask for more keys let mut client = get_auth_client( - "http://[::1]:50054", + &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id, device_info.access_token, @@ -96,7 +98,7 @@ // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs --- a/services/commtest/tests/tunnelbroker_integration_tests.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -6,6 +6,7 @@ use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::{create_socket, send_message}; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; @@ -25,7 +26,7 @@ // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = - TunnelbrokerServiceClient::connect("http://localhost:50051") + TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); @@ -52,7 +53,7 @@ // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } diff --git a/services/commtest/tests/tunnelbroker_persist_tests.rs b/services/commtest/tests/tunnelbroker_persist_tests.rs --- a/services/commtest/tests/tunnelbroker_persist_tests.rs +++ b/services/commtest/tests/tunnelbroker_persist_tests.rs @@ -5,6 +5,7 @@ use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::{create_socket, send_message}; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; @@ -24,7 +25,7 @@ // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = - TunnelbrokerServiceClient::connect("http://localhost:50051") + TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); @@ -53,7 +54,7 @@ // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); }; }