diff --git a/services/commtest/src/backup/backup_utils.rs b/services/commtest/src/backup/backup_utils.rs new file mode 100644 index 000000000..964661ac2 --- /dev/null +++ b/services/commtest/src/backup/backup_utils.rs @@ -0,0 +1,97 @@ +use crate::identity::device::DeviceInfo; + +use crate::tools::generate_stable_nbytes; +use backup_client::{BackupData, Error as BackupClientError}; +use bytesize::ByteSize; +use comm_lib::auth::UserIdentity; +use comm_lib::backup::UploadLogRequest; +use reqwest::StatusCode; +use uuid::Uuid; + +pub fn generate_backup_data(predefined_byte_value: u8) -> BackupData { + BackupData { + backup_id: Uuid::new_v4().to_string(), + user_keys: generate_stable_nbytes( + ByteSize::kib(4).as_u64() as usize, + Some(predefined_byte_value), + ), + user_data: generate_stable_nbytes( + ByteSize::mib(4).as_u64() as usize, + Some(predefined_byte_value), + ), + attachments: vec![], + siwe_backup_msg: None, + } +} + +fn generate_log_data(backup_id: &str, value: u8) -> Vec { + const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize; + const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize; + + (1..30) + .map(|log_id| { + let size = if log_id % 2 == 0 { + IN_DB_SIZE + } else { + IN_BLOB_SIZE + }; + let attachments = if log_id % 10 == 0 { + Some(vec![Uuid::new_v4().to_string()]) + } else { + None + }; + let mut content = generate_stable_nbytes(size, Some(value)); + let unique_suffix = log_id.to_string(); + content.extend(unique_suffix.as_bytes()); + + UploadLogRequest { + backup_id: backup_id.to_string(), + log_id, + content, + attachments, + } + }) + .collect() +} + +pub fn generate_backup_data_with_logs( + predefined_byte_values: Vec, +) -> Vec<(BackupData, Vec)> { + predefined_byte_values + .into_iter() + .map(|byte_value| { + let backup_data = generate_backup_data(byte_value); + let log_data = generate_log_data(&backup_data.backup_id, byte_value); + (backup_data, log_data) + }) + .collect() +} + +pub fn assert_reqwest_error( + response: Result, + expected_status: StatusCode, +) { + match response { + Err(BackupClientError::ReqwestError(error)) => { + assert_eq!( + error.status(), + Some(expected_status), + "Expected status {}", + expected_status + ); + } + Err(err) => panic!( + "Backup should return ReqwestError, instead got response: {:?}", + err + ), + Ok(_) => panic!("Backup should return BackupClientError"), + } +} + +pub fn create_user_identity(device_info: DeviceInfo) -> UserIdentity { + UserIdentity { + user_id: device_info.user_id.clone(), + access_token: device_info.access_token.clone(), + device_id: device_info.device_id.clone(), + } +} diff --git a/services/commtest/src/backup/mod.rs b/services/commtest/src/backup/mod.rs new file mode 100644 index 000000000..14213fe35 --- /dev/null +++ b/services/commtest/src/backup/mod.rs @@ -0,0 +1 @@ +pub mod backup_utils; diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs index d00a97c90..49c6a686e 100644 --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -1,234 +1,235 @@ use comm_opaque2::client::{Login, Registration}; use grpc_clients::identity::{ get_auth_client, get_unauthenticated_client, PlatformMetadata, }; use rand::{distributions::Alphanumeric, Rng}; use std::borrow::Cow; use crate::identity::olm_account::{generate_random_olm_key, ClientPublicKeys}; use crate::service_addr; use grpc_clients::identity::protos::unauth::{ DeviceKeyUpload, DeviceType, Empty, IdentityKeyInfo, OpaqueLoginFinishRequest, OpaqueLoginStartRequest, Prekey, RegistrationFinishRequest, RegistrationStartRequest, VerifyUserAccessTokenRequest, }; pub const PLACEHOLDER_CODE_VERSION: u64 = 0; pub const DEVICE_TYPE: &str = "service"; const PASSWORD: &str = "pass"; +#[derive(Clone)] pub struct DeviceInfo { pub username: String, pub user_id: String, pub device_id: String, pub access_token: String, } impl From<&DeviceInfo> for VerifyUserAccessTokenRequest { fn from(value: &DeviceInfo) -> Self { Self { user_id: value.user_id.to_string(), device_id: value.device_id.to_string(), access_token: value.access_token.to_string(), } } } /// Register a new user with a device. /// - Gives random username (returned by function). /// - Device type defaults to keyserver. /// - Device ID taken from `keys` (ed25519), see [`DEFAULT_CLIENT_KEYS`] pub async fn register_user_device( keys: Option<&ClientPublicKeys>, device_type: Option, ) -> DeviceInfo { register_user_device_with_device_list(keys, device_type, None).await } /// Same as [`register_user_device`] but with third param being a /// stringified signed device list JSON pub async fn register_user_device_with_device_list( keys: Option<&ClientPublicKeys>, device_type: Option, initial_device_list: Option, ) -> DeviceInfo { let username: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); let device_keys = keys.map(Cow::Borrowed).unwrap_or_default(); let example_payload = serde_json::to_string(&device_keys) .expect("Failed to serialize example payload"); // The ed25519 value from the olm payload let device_id = device_keys.device_id(); let device_type = device_type.unwrap_or(DeviceType::Keyserver); let mut client_registration = Registration::new(); let opaque_registration_request = client_registration.start(PASSWORD).unwrap(); let registration_start_request = RegistrationStartRequest { opaque_registration_request, username: username.to_string(), device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: example_payload.to_string(), payload_signature: "foo".to_string(), }), content_upload: Some(Prekey { prekey: generate_random_olm_key(), prekey_signature: "content_prekey_sig".to_string(), }), notif_upload: Some(Prekey { prekey: generate_random_olm_key(), prekey_signature: "notif_prekey_sig".to_string(), }), one_time_content_prekeys: Vec::new(), one_time_notif_prekeys: Vec::new(), device_type: device_type.into(), }), farcaster_id: None, initial_device_list: initial_device_list.unwrap_or_default(), }; let mut identity_client = get_unauthenticated_client( &service_addr::IDENTITY_GRPC.to_string(), PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), ) .await .expect("Couldn't connect to identity service"); let registration_start_response = identity_client .register_password_user_start(registration_start_request) .await .unwrap() .into_inner(); let opaque_registration_upload = client_registration .finish( PASSWORD, ®istration_start_response.opaque_registration_response, ) .unwrap(); let registration_finish_request = RegistrationFinishRequest { session_id: registration_start_response.session_id, opaque_registration_upload, }; let registration_finish_response = identity_client .register_password_user_finish(registration_finish_request) .await .unwrap() .into_inner(); DeviceInfo { username: username.to_string(), device_id: device_id.to_string(), user_id: registration_finish_response.user_id, access_token: registration_finish_response.access_token, } } /// Log in existing user with a device. /// - Tries to log in with given username (it has to be already registered) /// - Device type defaults to keyserver. /// - Device ID taken from `keys` (ed25519), see [`DEFAULT_CLIENT_KEYS`] pub async fn login_user_device( username: &str, keys: Option<&ClientPublicKeys>, device_type: Option, force: bool, ) -> DeviceInfo { let device_keys = keys.cloned().unwrap_or_default(); let example_payload = serde_json::to_string(&device_keys) .expect("Failed to serialize example payload"); // The ed25519 value from the olm payload let device_id = device_keys.device_id(); let device_type = device_type.unwrap_or(DeviceType::Keyserver); let mut client_login = Login::new(); let opaque_login_request = client_login.start(PASSWORD).unwrap(); let login_start_request = OpaqueLoginStartRequest { opaque_login_request, username: username.to_string(), device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: example_payload.to_string(), payload_signature: "foo".to_string(), }), content_upload: Some(Prekey { prekey: generate_random_olm_key(), prekey_signature: "content_prekey_sig".to_string(), }), notif_upload: Some(Prekey { prekey: generate_random_olm_key(), prekey_signature: "notif_prekey_sig".to_string(), }), one_time_content_prekeys: Vec::new(), one_time_notif_prekeys: Vec::new(), device_type: device_type.into(), }), force: Some(force), }; let mut identity_client = get_unauthenticated_client( &service_addr::IDENTITY_GRPC.to_string(), PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), ) .await .expect("Couldn't connect to identity service"); let login_start_response = identity_client .log_in_password_user_start(login_start_request) .await .unwrap() .into_inner(); let opaque_login_upload = client_login .finish(&login_start_response.opaque_login_response) .unwrap(); let login_finish_request = OpaqueLoginFinishRequest { session_id: login_start_response.session_id, opaque_login_upload, }; let login_finish_response = identity_client .log_in_password_user_finish(login_finish_request) .await .unwrap() .into_inner(); DeviceInfo { username: username.to_string(), device_id: device_id.to_string(), user_id: login_finish_response.user_id, access_token: login_finish_response.access_token, } } pub async fn logout_user_device(device_info: DeviceInfo) { let DeviceInfo { user_id, device_id, access_token, .. } = device_info; let mut client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), user_id, device_id, access_token, PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), ) .await .expect("Couldnt connect to auth identity service"); client .log_out_user(Empty {}) .await .expect("Failed to logout user"); } diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs index e4f84fa23..1b6efef9c 100644 --- a/services/commtest/src/lib.rs +++ b/services/commtest/src/lib.rs @@ -1,6 +1,7 @@ +pub mod backup; pub mod blob; pub mod constants; pub mod identity; pub mod service_addr; pub mod tools; pub mod tunnelbroker; diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 5acc94bde..c77533f69 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,236 +1,140 @@ use backup_client::{ - BackupClient, BackupData, BackupDescriptor, DownloadedLog, - Error as BackupClientError, LogUploadConfirmation, RequestedData, SinkExt, - StreamExt, TryStreamExt, + BackupClient, BackupDescriptor, DownloadedLog, LogUploadConfirmation, + RequestedData, SinkExt, StreamExt, TryStreamExt, }; -use bytesize::ByteSize; -use comm_lib::{ - auth::UserIdentity, - backup::{LatestBackupInfoResponse, UploadLogRequest}, +use comm_lib::backup::LatestBackupInfoResponse; +use commtest::backup::backup_utils::{ + assert_reqwest_error, create_user_identity, generate_backup_data_with_logs, }; use commtest::identity::device::register_user_device; -use commtest::{ - service_addr, - tools::{generate_stable_nbytes, Error}, -}; +use commtest::{service_addr, tools::Error}; use grpc_clients::identity::DeviceType; use reqwest::StatusCode; use std::collections::HashSet; -use uuid::Uuid; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; let device_info = register_user_device(None, Some(DeviceType::Ios)).await; + let user_identity = create_user_identity(device_info.clone()); - let user_identity = UserIdentity { - user_id: device_info.user_id.clone(), - access_token: device_info.access_token, - device_id: device_info.device_id, - }; - - let backup_datas = generate_backup_data(); + let backup_datas = generate_backup_data_with_logs(vec![b'a', b'b']); // Upload backups for (backup_data, log_datas) in &backup_datas { backup_client .upload_backup(&user_identity, backup_data.clone()) .await?; let (mut tx, rx) = backup_client.upload_logs(&user_identity).await?; for log_data in log_datas { tx.send(log_data.clone()).await?; } let result: HashSet = rx.take(log_datas.len()).try_collect().await?; let expected = log_datas .iter() .map(|data| LogUploadConfirmation { backup_id: data.backup_id.clone(), log_id: data.log_id, }) .collect(); assert_eq!(result, expected); } // Test direct lookup let (backup_data, log_datas) = &backup_datas[1]; let second_backup_descriptor = BackupDescriptor::BackupID { backup_id: backup_data.backup_id.clone(), user_identity: user_identity.clone(), }; let user_keys = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); let user_data = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserData) .await?; assert_eq!(user_data, backup_data.user_data); // Test latest backup lookup for nonexistent user let latest_backup_descriptor = BackupDescriptor::Latest { user_identifier: "nonexistent_user".to_string(), }; let nonexistent_user_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupInfo) .await; - match nonexistent_user_response { - Ok(_) => panic!("Expected error, but got success response"), - Err(BackupClientError::ReqwestError(error)) => { - assert_eq!( - error.status(), - Some(StatusCode::BAD_REQUEST), - "Expected bad request status" - ); - } - Err(_) => panic!("Unexpected error type"), - } + assert_reqwest_error(nonexistent_user_response, StatusCode::BAD_REQUEST); // Test latest backup lookup let latest_backup_descriptor = BackupDescriptor::Latest { user_identifier: device_info.username, }; let backup_info_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupInfo) .await?; let response: LatestBackupInfoResponse = serde_json::from_slice(&backup_info_response)?; assert_eq!(response.backup_id, backup_data.backup_id); assert_eq!(response.user_id, device_info.user_id); let user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); // Test log download let log_stream = backup_client .download_logs(&user_identity, &backup_data.backup_id) .await; let downloaded_logs: Vec = log_stream.try_collect().await?; let expected_logs: Vec = log_datas .iter() .map(|data| DownloadedLog { content: data.content.clone(), attachments: data.attachments.clone(), }) .collect(); assert_eq!(downloaded_logs, expected_logs); // Test backup cleanup let (removed_backup, _) = &backup_datas[0]; let removed_backup_descriptor = BackupDescriptor::BackupID { backup_id: removed_backup.backup_id.clone(), user_identity: user_identity.clone(), }; let response = backup_client .download_backup_data(&removed_backup_descriptor, RequestedData::UserKeys) .await; - let Err(BackupClientError::ReqwestError(error)) = response else { - panic!("First backup should have been removed, instead got response: {response:?}"); - }; - - assert_eq!( - error.status(), - Some(StatusCode::NOT_FOUND), - "Expected status 'not found'" - ); + assert_reqwest_error(response, StatusCode::NOT_FOUND); // Test log cleanup let log_stream = backup_client .download_logs(&user_identity, &removed_backup.backup_id) .await; let downloaded_logs: Vec = log_stream.try_collect().await?; if !downloaded_logs.is_empty() { panic!( "Logs for first backup should have been removed, \ instead got: {downloaded_logs:?}" ) } Ok(()) } - -fn generate_backup_data() -> [(BackupData, Vec); 2] { - [ - ( - BackupData { - backup_id: "b1".to_string(), - user_keys: generate_stable_nbytes( - ByteSize::kib(4).as_u64() as usize, - Some(b'a'), - ), - user_data: generate_stable_nbytes( - ByteSize::mib(4).as_u64() as usize, - Some(b'A'), - ), - attachments: vec![], - siwe_backup_msg: None, - }, - generate_log_data("b1", b'a'), - ), - ( - BackupData { - backup_id: "b2".to_string(), - user_keys: generate_stable_nbytes( - ByteSize::kib(4).as_u64() as usize, - Some(b'b'), - ), - user_data: generate_stable_nbytes( - ByteSize::mib(4).as_u64() as usize, - Some(b'B'), - ), - attachments: vec![], - siwe_backup_msg: None, - }, - generate_log_data("b2", b'b'), - ), - ] -} - -fn generate_log_data(backup_id: &str, value: u8) -> Vec { - const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize; - const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize; - - (1..30) - .map(|log_id| { - let size = if log_id % 2 == 0 { - IN_DB_SIZE - } else { - IN_BLOB_SIZE - }; - let attachments = if log_id % 10 == 0 { - Some(vec![Uuid::new_v4().to_string()]) - } else { - None - }; - let mut content = generate_stable_nbytes(size, Some(value)); - let unique_suffix = log_id.to_string(); - content.extend(unique_suffix.as_bytes()); - - UploadLogRequest { - backup_id: backup_id.to_string(), - log_id, - content, - attachments, - } - }) - .collect() -} diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs index 0434002ac..98f84d109 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,189 +1,169 @@ -use backup_client::{ - BackupClient, BackupData, BackupDescriptor, RequestedData, +use backup_client::{BackupClient, BackupDescriptor, RequestedData}; + +use comm_lib::backup::LatestBackupInfoResponse; +use commtest::backup::backup_utils::{ + create_user_identity, generate_backup_data, }; -use bytesize::ByteSize; -use comm_lib::{auth::UserIdentity, backup::LatestBackupInfoResponse}; use commtest::identity::device::register_user_device; use commtest::{ service_addr, - tools::{generate_stable_nbytes, obtain_number_of_threads, Error}, + tools::{obtain_number_of_threads, Error}, }; use grpc_clients::identity::DeviceType; use tokio::{runtime::Runtime, task::JoinSet}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; let number_of_threads = obtain_number_of_threads(); let rt = Runtime::new().unwrap(); println!( "Running performance tests for backup, number of threads: {}", number_of_threads ); - let mut backup_data = vec![]; - for i in 0..number_of_threads { - backup_data.push(BackupData { - backup_id: format!("b{i}"), - user_keys: generate_stable_nbytes( - ByteSize::kib(4).as_u64() as usize, - Some(i as u8), - ), - user_data: generate_stable_nbytes( - ByteSize::mib(4).as_u64() as usize, - Some(i as u8), - ), - attachments: vec![], - siwe_backup_msg: None, - }); - } + let backup_data: Vec<_> = (0..number_of_threads) + .map(|i| generate_backup_data(i as u8)) + .collect(); let device_info_1 = register_user_device(None, Some(DeviceType::Ios)).await; let device_info_2 = register_user_device(None, Some(DeviceType::Ios)).await; let user_identities = [ - UserIdentity { - user_id: device_info_1.user_id.clone(), - access_token: device_info_1.access_token, - device_id: device_info_1.device_id, - }, - UserIdentity { - user_id: device_info_2.user_id.clone(), - access_token: device_info_2.access_token, - device_id: device_info_2.device_id, - }, + create_user_identity(device_info_1.clone()), + create_user_identity(device_info_2.clone()), ]; tokio::task::spawn_blocking(move || { println!("Creating new backups"); rt.block_on(async { let mut set = JoinSet::new(); for (i, item) in backup_data.iter().cloned().enumerate() { let backup_client = backup_client.clone(); let user = user_identities[i % user_identities.len()].clone(); set.spawn(async move { backup_client.upload_backup(&user, item).await.unwrap(); }); } while let Some(result) = set.join_next().await { result.unwrap(); } }); let mut latest_ids_for_user = vec![]; println!("Reading latest ids"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let backup_client = backup_client.clone(); let user_identifier = if user.user_id == device_info_1.user_id { device_info_1.username.clone() } else { device_info_2.username.clone() }; let descriptor = BackupDescriptor::Latest { user_identifier }; handlers.push(tokio::spawn(async move { let response = backup_client .download_backup_data(&descriptor, RequestedData::BackupInfo) .await .unwrap(); serde_json::from_slice::(&response).unwrap() })); } for handler in handlers { latest_ids_for_user.push(handler.await.unwrap().backup_id); } }); assert_eq!(latest_ids_for_user.len(), user_identities.len()); let mut latest_user_keys_for_user = vec![]; println!("Reading latest user keys"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let backup_client = backup_client.clone(); let user_identifier = if user.user_id == device_info_1.user_id { device_info_1.username.clone() } else { device_info_2.username.clone() }; let descriptor = BackupDescriptor::Latest { user_identifier }; handlers.push(tokio::spawn(async move { backup_client .download_backup_data(&descriptor, RequestedData::UserKeys) .await .unwrap() })); } for handler in handlers { latest_user_keys_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_keys_for_user.len(), user_identities.len()); for (backup_id, user_keys) in latest_ids_for_user.iter().zip(latest_user_keys_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_keys, user_keys); } let mut latest_user_data_for_user = vec![]; println!("Reading latest user data"); rt.block_on(async { let mut handlers = vec![]; for (i, backup_id) in latest_ids_for_user.iter().enumerate() { let backup_client = backup_client.clone(); let descriptor = BackupDescriptor::BackupID { backup_id: backup_id.clone(), user_identity: user_identities[i % user_identities.len()].clone(), }; handlers.push(tokio::spawn(async move { backup_client .download_backup_data(&descriptor, RequestedData::UserData) .await .unwrap() })); } for handler in handlers { latest_user_data_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_data_for_user.len(), user_identities.len()); for (backup_id, user_data) in latest_ids_for_user.iter().zip(latest_user_data_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_data, user_data); } }) .await .expect("Task panicked"); Ok(()) }