diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 4cd2d7777..568ec103d 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,211 +1,215 @@ use backup_client::{ BackupClient, BackupData, BackupDescriptor, DownloadedLog, Error as BackupClientError, LogUploadConfirmation, RequestedData, SinkExt, StreamExt, TryStreamExt, }; use bytesize::ByteSize; use comm_lib::{ auth::UserIdentity, backup::{LatestBackupIDResponse, UploadLogRequest}, }; +use commtest::identity::device::register_user_device; use commtest::{ service_addr, tools::{generate_stable_nbytes, Error}, }; +use grpc_clients::identity::DeviceType; use reqwest::StatusCode; use std::collections::HashSet; use uuid::Uuid; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; + let device_info = register_user_device(None, Some(DeviceType::Ios)).await; + let user_identity = UserIdentity { - user_id: "1".to_string(), - access_token: "dummy access token".to_string(), - device_id: "dummy device_id".to_string(), + user_id: device_info.user_id.clone(), + access_token: device_info.access_token, + device_id: device_info.device_id, }; let backup_datas = generate_backup_data(); // Upload backups for (backup_data, log_datas) in &backup_datas { backup_client .upload_backup(&user_identity, backup_data.clone()) .await?; let (mut tx, rx) = backup_client.upload_logs(&user_identity).await?; for log_data in log_datas { tx.send(log_data.clone()).await?; } let result: HashSet = rx.take(log_datas.len()).try_collect().await?; let expected = log_datas .iter() .map(|data| LogUploadConfirmation { backup_id: data.backup_id.clone(), log_id: data.log_id, }) .collect(); assert_eq!(result, expected); } // Test direct lookup let (backup_data, log_datas) = &backup_datas[1]; let second_backup_descriptor = BackupDescriptor::BackupID { backup_id: backup_data.backup_id.clone(), user_identity: user_identity.clone(), }; let user_keys = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); let user_data = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserData) .await?; assert_eq!(user_data, backup_data.user_data); // Test latest backup lookup let latest_backup_descriptor = BackupDescriptor::Latest { // Initial version of the backup service uses `user_id` in place of a username - username: "1".to_string(), + username: device_info.user_id.to_string(), }; let backup_id_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await?; let response: LatestBackupIDResponse = serde_json::from_slice(&backup_id_response)?; assert_eq!(response.backup_id, backup_data.backup_id); let user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; assert_eq!(user_keys, backup_data.user_keys); // Test log download let log_stream = backup_client .download_logs(&user_identity, &backup_data.backup_id) .await; let downloaded_logs: Vec = log_stream.try_collect().await?; let expected_logs: Vec = log_datas .iter() .map(|data| DownloadedLog { content: data.content.clone(), attachments: data.attachments.clone(), }) .collect(); assert_eq!(downloaded_logs, expected_logs); // Test backup cleanup let (removed_backup, _) = &backup_datas[0]; let removed_backup_descriptor = BackupDescriptor::BackupID { backup_id: removed_backup.backup_id.clone(), user_identity: user_identity.clone(), }; let response = backup_client .download_backup_data(&removed_backup_descriptor, RequestedData::UserKeys) .await; let Err(BackupClientError::ReqwestError(error)) = response else { panic!("First backup should have been removed, instead got response: {response:?}"); }; assert_eq!( error.status(), Some(StatusCode::NOT_FOUND), "Expected status 'not found'" ); // Test log cleanup let log_stream = backup_client .download_logs(&user_identity, &removed_backup.backup_id) .await; let downloaded_logs: Vec = log_stream.try_collect().await?; if !downloaded_logs.is_empty() { panic!( "Logs for first backup should have been removed, \ instead got: {downloaded_logs:?}" ) } Ok(()) } fn generate_backup_data() -> [(BackupData, Vec); 2] { [ ( BackupData { backup_id: "b1".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'a'), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'A'), ), attachments: vec![], siwe_backup_msg: None, }, generate_log_data("b1", b'a'), ), ( BackupData { backup_id: "b2".to_string(), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(b'b'), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(b'B'), ), attachments: vec![], siwe_backup_msg: None, }, generate_log_data("b2", b'b'), ), ] } fn generate_log_data(backup_id: &str, value: u8) -> Vec { const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize; const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize; (1..30) .map(|log_id| { let size = if log_id % 2 == 0 { IN_DB_SIZE } else { IN_BLOB_SIZE }; let attachments = if log_id % 10 == 0 { Some(vec![Uuid::new_v4().to_string()]) } else { None }; let mut content = generate_stable_nbytes(size, Some(value)); let unique_suffix = log_id.to_string(); content.extend(unique_suffix.as_bytes()); UploadLogRequest { backup_id: backup_id.to_string(), log_id, content, attachments, } }) .collect() } diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs index 6168bc4f5..522d04b90 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,172 +1,177 @@ use backup_client::{ BackupClient, BackupData, BackupDescriptor, RequestedData, }; use bytesize::ByteSize; use comm_lib::{auth::UserIdentity, backup::LatestBackupIDResponse}; +use commtest::identity::device::register_user_device; use commtest::{ service_addr, tools::{generate_stable_nbytes, obtain_number_of_threads, Error}, }; +use grpc_clients::identity::DeviceType; use tokio::{runtime::Runtime, task::JoinSet}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; let number_of_threads = obtain_number_of_threads(); let rt = Runtime::new().unwrap(); println!( "Running performance tests for backup, number of threads: {}", number_of_threads ); let mut backup_data = vec![]; for i in 0..number_of_threads { backup_data.push(BackupData { backup_id: format!("b{i}"), user_keys: generate_stable_nbytes( ByteSize::kib(4).as_u64() as usize, Some(i as u8), ), user_data: generate_stable_nbytes( ByteSize::mib(4).as_u64() as usize, Some(i as u8), ), attachments: vec![], siwe_backup_msg: None, }); } + let device_info_1 = register_user_device(None, Some(DeviceType::Ios)).await; + let device_info_2 = register_user_device(None, Some(DeviceType::Ios)).await; + let user_identities = [ UserIdentity { - user_id: "1".to_string(), - access_token: "dummy access token".to_string(), - device_id: "dummy device_id".to_string(), + user_id: device_info_1.user_id.clone(), + access_token: device_info_1.access_token, + device_id: device_info_1.device_id, }, UserIdentity { - user_id: "2".to_string(), - access_token: "dummy access token".to_string(), - device_id: "dummy device_id".to_string(), + user_id: device_info_2.user_id.clone(), + access_token: device_info_2.access_token, + device_id: device_info_2.device_id, }, ]; tokio::task::spawn_blocking(move || { println!("Creating new backups"); rt.block_on(async { let mut set = JoinSet::new(); for (i, item) in backup_data.iter().cloned().enumerate() { let backup_client = backup_client.clone(); let user = user_identities[i % user_identities.len()].clone(); set.spawn(async move { backup_client.upload_backup(&user, item).await.unwrap(); }); } while let Some(result) = set.join_next().await { result.unwrap(); } }); let mut latest_ids_for_user = vec![]; println!("Reading latest ids"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let backup_client = backup_client.clone(); let descriptor = BackupDescriptor::Latest { username: user.user_id.clone(), }; handlers.push(tokio::spawn(async move { let response = backup_client .download_backup_data(&descriptor, RequestedData::BackupID) .await .unwrap(); serde_json::from_slice::(&response).unwrap() })); } for handler in handlers { latest_ids_for_user.push(handler.await.unwrap().backup_id); } }); assert_eq!(latest_ids_for_user.len(), user_identities.len()); let mut latest_user_keys_for_user = vec![]; println!("Reading latest user keys"); rt.block_on(async { let mut handlers = vec![]; for user in &user_identities { let backup_client = backup_client.clone(); let descriptor = BackupDescriptor::Latest { username: user.user_id.clone(), }; handlers.push(tokio::spawn(async move { backup_client .download_backup_data(&descriptor, RequestedData::UserKeys) .await .unwrap() })); } for handler in handlers { latest_user_keys_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_keys_for_user.len(), user_identities.len()); for (backup_id, user_keys) in latest_ids_for_user.iter().zip(latest_user_keys_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_keys, user_keys); } let mut latest_user_data_for_user = vec![]; println!("Reading latest user data"); rt.block_on(async { let mut handlers = vec![]; for (i, backup_id) in latest_ids_for_user.iter().enumerate() { let backup_client = backup_client.clone(); let descriptor = BackupDescriptor::BackupID { backup_id: backup_id.clone(), user_identity: user_identities[i % user_identities.len()].clone(), }; handlers.push(tokio::spawn(async move { backup_client .download_backup_data(&descriptor, RequestedData::UserData) .await .unwrap() })); } for handler in handlers { latest_user_data_for_user.push(handler.await.unwrap()); } }); assert_eq!(latest_user_data_for_user.len(), user_identities.len()); for (backup_id, user_data) in latest_ids_for_user.iter().zip(latest_user_data_for_user) { let backup = backup_data .iter() .find(|data| data.backup_id == *backup_id) .expect("Request should return existing backup data"); assert_eq!(backup.user_data, user_data); } }) .await .expect("Task panicked"); Ok(()) }