diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -6,6 +6,7 @@ use crate::identity::olm_account_infos::{ ClientPublicKeys, DEFAULT_CLIENT_KEYS, }; +use crate::service_addr; use proto as client; use proto::{ identity_client_service_client::IdentityClientServiceClient, DeviceKeyUpload, @@ -62,7 +63,7 @@ // TODO: allow endpoint to be configured let mut identity_client = - IdentityClientServiceClient::connect("http://127.0.0.1:50054") + IdentityClientServiceClient::connect(service_addr::IDENTITY_GRPC) .await .expect("Couldn't connect to identitiy service"); diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs --- a/services/commtest/src/lib.rs +++ b/services/commtest/src/lib.rs @@ -2,5 +2,6 @@ pub mod blob; pub mod constants; pub mod identity; +pub mod service_addr; pub mod tools; pub mod tunnelbroker; diff --git a/services/commtest/src/service_addr.rs b/services/commtest/src/service_addr.rs new file mode 100644 --- /dev/null +++ b/services/commtest/src/service_addr.rs @@ -0,0 +1,88 @@ +use std::env; +use std::fmt::Display; + +use tokio_tungstenite::tungstenite; + +pub static TUNNELBROKER_WS: &ServiceAddr = &ServiceAddr { + scheme: "ws", + endpoint_env_var: "TUNNELBROKER_WS_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_TUNNELBROKER_WS", + default_port: 51001, +}; + +pub static TUNNELBROKER_GRPC: &ServiceAddr = &ServiceAddr { + scheme: "ws", + endpoint_env_var: "TUNNELBROKER_GRPC_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_TUNNELBROKER", + default_port: 50051, +}; +pub static BACKUP_SERVICE_HTTP: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "BACKUP_SERVICE_URL", + port_env_var: "COMM_SERVICES_PORT_BACKUP", + default_port: 50052, +}; +pub static BLOB_SERVICE_HTTP: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "BLOB_SERVICE_URL", + port_env_var: "COMM_SERVICES_PORT_BLOB", + default_port: 50053, +}; +pub static IDENTITY_GRPC: &ServiceAddr = &ServiceAddr { + scheme: "http", + endpoint_env_var: "IDENTITY_GRPC_ENDPOINT", + port_env_var: "COMM_SERVICES_PORT_IDENTITY", + default_port: 50054, +}; + +#[derive(Debug, Clone, Copy)] +pub struct ServiceAddr { + scheme: &'static str, + default_port: u16, + /// Environment variable name for the endpoint + /// that overrides all other values + endpoint_env_var: &'static str, + /// Environment variable name for the port, overrides default_port + /// If `endpoint_env_var` is set, this is ignored. + /// Otherwise, addr is `scheme://localhost:${port_env_var}` + port_env_var: &'static str, +} + +impl Display for ServiceAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Ok(endpoint) = env::var(self.endpoint_env_var) { + return write!(f, "{}", endpoint); + } + + let port = env::var(self.port_env_var) + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(self.default_port); + + write!(f, "{}://localhost:{}", self.scheme, port) + } +} + +impl TryFrom<&ServiceAddr> for reqwest::Url { + type Error = url::ParseError; + + fn try_from(service_addr: &ServiceAddr) -> Result { + reqwest::Url::parse(&service_addr.to_string()) + } +} + +impl TryFrom<&ServiceAddr> for tonic::transport::Endpoint { + type Error = tonic::transport::Error; + + fn try_from(value: &ServiceAddr) -> Result { + value.to_string().try_into() + } +} + +impl tungstenite::client::IntoClientRequest for &ServiceAddr { + fn into_client_request( + self, + ) -> tungstenite::Result { + self.to_string().into_client_request() + } +} diff --git a/services/commtest/src/tunnelbroker/socket.rs b/services/commtest/src/tunnelbroker/socket.rs --- a/services/commtest/src/tunnelbroker/socket.rs +++ b/services/commtest/src/tunnelbroker/socket.rs @@ -1,4 +1,5 @@ use crate::identity::device::DeviceInfo; +use crate::service_addr; use futures_util::SinkExt; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::Message; @@ -8,7 +9,7 @@ pub async fn create_socket( device_info: &DeviceInfo, ) -> WebSocketStream> { - let (mut socket, _) = connect_async("ws://localhost:51001") + let (mut socket, _) = connect_async(service_addr::TUNNELBROKER_WS) .await .expect("Can't connect"); diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -6,20 +6,15 @@ create_new_backup, pull_backup::{self, BackupDescriptor, RequestedData}, }, + service_addr, tools::{generate_stable_nbytes, Error}, }; use reqwest::StatusCode; -use std::env; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BACKUP_SERVICE_HTTP) + .expect("failed to parse backup service url"); let backup_datas = [ BackupData { diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -6,20 +6,15 @@ create_new_backup, pull_backup::{self, BackupDescriptor}, }, + service_addr, tools::{generate_stable_nbytes, obtain_number_of_threads, Error}, }; -use std::env; use tokio::{runtime::Runtime, task::JoinSet}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BACKUP_SERVICE_HTTP) + .expect("failed to parse backup service url"); let number_of_threads = obtain_number_of_threads(); diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,20 +1,18 @@ use bytesize::ByteSize; -use commtest::blob::{ - blob_utils::{BlobData, BlobServiceClient}, - get, put, remove, -}; use commtest::constants; use commtest::tools::Error; -use std::env; +use commtest::{ + blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, + }, + service_addr, +}; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let blob_data = vec![ @@ -48,12 +46,12 @@ ]; for item in &blob_data { - let data_exists: bool = put::run(&client, &item).await?; + let data_exists: bool = put::run(&client, item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { - let received_sizes = get::run(&client, &blob_item).await?; + let received_sizes = get::run(&client, blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( @@ -64,9 +62,9 @@ } for item in &blob_data { - remove::run(&client, &item).await?; + remove::run(&client, item).await?; assert!( - get::run(&client, &item).await.is_err(), + get::run(&client, item).await.is_err(), "item should no longer be available" ); } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,20 +1,18 @@ use bytesize::ByteSize; -use commtest::blob::{ - blob_utils::{BlobData, BlobServiceClient}, - get, put, remove, -}; use commtest::tools::{obtain_number_of_threads, Error}; -use std::env; +use commtest::{ + blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, + }, + service_addr, +}; use tokio::runtime::Runtime; #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BLOB") - .expect("port env var expected but not received") - .parse() - .expect("port env var should be a number"); - let mut url = reqwest::Url::parse("http://localhost")?; - url.set_port(Some(port)).expect("failed to set port"); + let url = reqwest::Url::try_from(service_addr::BLOB_SERVICE_HTTP) + .expect("failed to parse blob service url"); let client = BlobServiceClient::new(url); let number_of_threads = obtain_number_of_threads(); diff --git a/services/commtest/tests/grpc_client_test.rs b/services/commtest/tests/grpc_client_test.rs --- a/services/commtest/tests/grpc_client_test.rs +++ b/services/commtest/tests/grpc_client_test.rs @@ -1,4 +1,4 @@ -use commtest::identity::device::create_device; +use commtest::{identity::device::create_device, service_addr}; #[tokio::test] async fn verify_access_token() { @@ -8,7 +8,7 @@ let device_type = "android"; let token_valid = verify_user_access_token( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, &device_info.access_token, @@ -18,11 +18,11 @@ .await .expect("Failed to call identity's verify_user_access_token endpoint"); - assert_eq!(token_valid, true); + assert!(token_valid); // Try again with invalid access token let token_valid = verify_user_access_token( - "http://127.0.0.1:50054", + &service_addr::IDENTITY_GRPC.to_string(), &device_info.user_id, &device_info.device_id, "garbage", @@ -32,5 +32,5 @@ .await .expect("Failed to call identity's verify_user_access_token endpoint"); - assert_eq!(token_valid, false); + assert!(!token_valid); } diff --git a/services/commtest/tests/identity_access_tokens_tests.rs b/services/commtest/tests/identity_access_tokens_tests.rs --- a/services/commtest/tests/identity_access_tokens_tests.rs +++ b/services/commtest/tests/identity_access_tokens_tests.rs @@ -1,6 +1,7 @@ mod proto { tonic::include_proto!("identity.client"); } +use commtest::service_addr; use proto as client; mod auth_proto { tonic::include_proto!("identity.authenticated"); @@ -14,7 +15,7 @@ let device_info = create_device(None).await; let mut identity_client = - IdentityClientServiceClient::connect("http://127.0.0.1:50054") + IdentityClientServiceClient::connect(service_addr::IDENTITY_GRPC) .await .expect("Couldn't connect to identitiy service"); @@ -29,7 +30,7 @@ .await .unwrap(); - assert_eq!(response.into_inner().token_valid, true); + assert!(response.into_inner().token_valid); } #[tokio::test] @@ -37,7 +38,7 @@ let device_info = create_device(None).await; let mut identity_client = - IdentityClientServiceClient::connect("http://127.0.0.1:50054") + IdentityClientServiceClient::connect(service_addr::IDENTITY_GRPC) .await .expect("Couldn't connect to identitiy service"); diff --git a/services/commtest/tests/identity_keyserver_tests.rs b/services/commtest/tests/identity_keyserver_tests.rs --- a/services/commtest/tests/identity_keyserver_tests.rs +++ b/services/commtest/tests/identity_keyserver_tests.rs @@ -8,14 +8,15 @@ use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient; use auth_proto::OutboundKeysForUserRequest; use client::UploadOneTimeKeysRequest; -use commtest::identity::device::create_device; +use commtest::{identity::device::create_device, service_addr}; use tonic::{transport::Endpoint, Request}; #[tokio::test] async fn set_prekey() { let device_info = create_device(None).await; - let channel = Endpoint::from_static("http://[::1]:50054") + let channel = Endpoint::try_from(service_addr::IDENTITY_GRPC) + .expect("failed to parse identity service endpoint") .connect() .await .unwrap(); @@ -39,7 +40,7 @@ }; let mut unauthenticated_client = - proto::identity_client_service_client::IdentityClientServiceClient::connect("http://127.0.0.1:50054") + proto::identity_client_service_client::IdentityClientServiceClient::connect(service_addr::IDENTITY_GRPC) .await .expect("Couldn't connect to identitiy service"); diff --git a/services/commtest/tests/identity_one_time_key_tests.rs b/services/commtest/tests/identity_one_time_key_tests.rs --- a/services/commtest/tests/identity_one_time_key_tests.rs +++ b/services/commtest/tests/identity_one_time_key_tests.rs @@ -7,13 +7,14 @@ use client::identity_client_service_client::IdentityClientServiceClient; use client::UploadOneTimeKeysRequest; use commtest::identity::device::create_device; +use commtest::service_addr; #[tokio::test] async fn verify_access_token() { let device_info = create_device(None).await; let mut identity_client = - IdentityClientServiceClient::connect("http://127.0.0.1:50054") + IdentityClientServiceClient::connect(service_addr::IDENTITY_GRPC) .await .expect("Couldn't connect to identity service"); diff --git a/services/commtest/tests/identity_prekey_tests.rs b/services/commtest/tests/identity_prekey_tests.rs --- a/services/commtest/tests/identity_prekey_tests.rs +++ b/services/commtest/tests/identity_prekey_tests.rs @@ -8,14 +8,15 @@ use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient; use auth_proto::RefreshUserPreKeysRequest; use client::PreKey; -use commtest::identity::device::create_device; +use commtest::{identity::device::create_device, service_addr}; use tonic::{transport::Endpoint, Request}; #[tokio::test] async fn set_prekey() { let device_info = create_device(None).await; - let channel = Endpoint::from_static("http://[::1]:50054") + let channel = Endpoint::try_from(service_addr::IDENTITY_GRPC) + .expect("failed to parse identity service endpoint") .connect() .await .unwrap(); diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -8,6 +8,7 @@ use client::identity_client_service_client::IdentityClientServiceClient; use client::UploadOneTimeKeysRequest; use commtest::identity::device::create_device; +use commtest::service_addr; use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use tonic::transport::Endpoint; @@ -45,7 +46,7 @@ let device_info = create_device(None).await; let mut identity_client = - IdentityClientServiceClient::connect("http://127.0.0.1:50054") + IdentityClientServiceClient::connect(service_addr::IDENTITY_GRPC) .await .expect("Couldn't connect to identitiy service"); @@ -63,7 +64,8 @@ .unwrap(); // Request outbound keys, which should trigger identity service to ask for more keys - let channel = Endpoint::from_static("http://[::1]:50054") + let channel = Endpoint::try_from(service_addr::IDENTITY_GRPC) + .expect("failed to parse identity service endpoint") .connect() .await .unwrap(); @@ -103,7 +105,7 @@ // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs --- a/services/commtest/tests/tunnelbroker_integration_tests.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -6,6 +6,7 @@ use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::create_socket; use futures_util::{SinkExt, StreamExt}; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; @@ -26,7 +27,7 @@ // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = - TunnelbrokerServiceClient::connect("http://localhost:50051") + TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); @@ -53,7 +54,7 @@ // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } diff --git a/services/commtest/tests/tunnelbroker_persist_tests.rs b/services/commtest/tests/tunnelbroker_persist_tests.rs --- a/services/commtest/tests/tunnelbroker_persist_tests.rs +++ b/services/commtest/tests/tunnelbroker_persist_tests.rs @@ -6,6 +6,7 @@ use commtest::identity::olm_account_infos::{ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, }; +use commtest::service_addr; use commtest::tunnelbroker::socket::create_socket; use futures_util::{SinkExt, StreamExt}; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; @@ -26,7 +27,7 @@ // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = - TunnelbrokerServiceClient::connect("http://localhost:50051") + TunnelbrokerServiceClient::connect(service_addr::TUNNELBROKER_GRPC) .await .unwrap(); @@ -55,7 +56,7 @@ // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = - serde_json::from_str(&response.to_text().unwrap()).unwrap(); + serde_json::from_str(response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); }; }