diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 8555737c9..638c73795 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,112 +1,226 @@ +use std::collections::{HashMap, HashSet}; + use backup_client::{ BackupClient, BackupData, BackupDescriptor, Error as BackupClientError, - RequestedData, + RequestedData, SinkExt, StreamExt, TryStreamExt, }; use bytesize::ByteSize; -use comm_lib::{auth::UserIdentity, backup::LatestBackupIDResponse}; +use comm_lib::{ + auth::UserIdentity, + backup::{ + DownloadLogsRequest, LatestBackupIDResponse, LogWSResponse, + UploadLogRequest, + }, +}; use commtest::{ service_addr, tools::{generate_stable_nbytes, Error}, }; use reqwest::StatusCode; +use uuid::Uuid; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?; - let backup_datas = [ - BackupData { - backup_id: "b1".to_string(), - user_keys: generate_stable_nbytes( - ByteSize::kib(4).as_u64() as usize, - Some(b'a'), - ), - user_data: generate_stable_nbytes( - ByteSize::mib(4).as_u64() as usize, - Some(b'A'), - ), - attachments: vec![], - }, - BackupData { - backup_id: "b2".to_string(), - user_keys: generate_stable_nbytes( - ByteSize::kib(4).as_u64() as usize, - Some(b'b'), - ), - user_data: generate_stable_nbytes( - ByteSize::mib(4).as_u64() as usize, - Some(b'B'), - ), - attachments: vec![], - }, - ]; - let user_identity = UserIdentity { user_id: "1".to_string(), access_token: "dummy access token".to_string(), device_id: "dummy device_id".to_string(), }; - backup_client - .upload_backup(&user_identity, backup_datas[0].clone()) - .await?; - backup_client - .upload_backup(&user_identity, backup_datas[1].clone()) - .await?; + let backup_datas = generate_backup_data(); + + // Upload backups + for (backup_data, log_datas) in &backup_datas { + backup_client + .upload_backup(&user_identity, backup_data.clone()) + .await?; + + let (tx, rx) = backup_client + .upload_logs(&user_identity, &backup_data.backup_id) + .await + .unwrap(); + + tokio::pin!(tx); + tokio::pin!(rx); + + for log_data in log_datas { + tx.send(log_data.clone()).await.unwrap(); + } + + let result: HashSet = + rx.take(log_datas.len()).try_collect().await.unwrap(); + let expected = log_datas.iter().map(|data| data.log_id).collect(); + assert_eq!(result, expected); + } // Test direct lookup + let (backup_data, log_datas) = &backup_datas[1]; + let second_backup_descriptor = BackupDescriptor::BackupID { - backup_id: backup_datas[1].backup_id.clone(), + backup_id: backup_data.backup_id.clone(), user_identity: user_identity.clone(), }; let user_keys = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserKeys) .await?; - assert_eq!(user_keys, backup_datas[1].user_keys); + assert_eq!(user_keys, backup_data.user_keys); let user_data = backup_client .download_backup_data(&second_backup_descriptor, RequestedData::UserData) .await?; - assert_eq!(user_data, backup_datas[1].user_data); + assert_eq!(user_data, backup_data.user_data); // Test latest backup lookup let latest_backup_descriptor = BackupDescriptor::Latest { // Initial version of the backup service uses `user_id` in place of a username username: "1".to_string(), }; let backup_id_response = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::BackupID) .await?; let response: LatestBackupIDResponse = serde_json::from_slice(&backup_id_response)?; - assert_eq!(response.backup_id, backup_datas[1].backup_id); + assert_eq!(response.backup_id, backup_data.backup_id); let user_keys = backup_client .download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys) .await?; - assert_eq!(user_keys, backup_datas[1].user_keys); + assert_eq!(user_keys, backup_data.user_keys); + + // Test log download + let (tx, rx) = backup_client + .download_logs(&user_identity, &backup_data.backup_id) + .await + .unwrap(); + + tokio::pin!(tx); + tokio::pin!(rx); + + tx.send(DownloadLogsRequest { from_id: None }) + .await + .unwrap(); + + let mut downloaded_logs = HashMap::new(); + 'download: loop { + loop { + match rx.next().await.unwrap().unwrap() { + LogWSResponse::LogDownload { + log_id, + content, + attachments, + } => { + downloaded_logs.insert(log_id, (content, attachments)); + } + LogWSResponse::LogDownloadFinished { last_log_id } => { + if let Some(last_log_id) = last_log_id { + tx.send(DownloadLogsRequest { + from_id: Some(last_log_id), + }) + .await + .unwrap(); + } else { + break 'download; + } + } + msg => panic!("Got response: {msg:?}"), + }; + } + } + let expected_logs = log_datas + .iter() + .cloned() + .map(|data| (data.log_id, (data.content, data.attachments))) + .collect(); + assert_eq!(downloaded_logs, expected_logs); // Test cleanup + let (cleaned_up_backup, _) = &backup_datas[0]; let first_backup_descriptor = BackupDescriptor::BackupID { - backup_id: backup_datas[0].backup_id.clone(), + backup_id: cleaned_up_backup.backup_id.clone(), user_identity: user_identity.clone(), }; let response = backup_client .download_backup_data(&first_backup_descriptor, RequestedData::UserKeys) .await; let Err(BackupClientError::ReqwestError(error)) = response else { panic!("First backup should have been removed, instead got response: {response:?}"); }; assert_eq!( error.status(), Some(StatusCode::NOT_FOUND), "Expected status 'not found'" ); Ok(()) } + +fn generate_backup_data() -> [(BackupData, Vec); 2] { + [ + ( + BackupData { + backup_id: "b1".to_string(), + user_keys: generate_stable_nbytes( + ByteSize::kib(4).as_u64() as usize, + Some(b'a'), + ), + user_data: generate_stable_nbytes( + ByteSize::mib(4).as_u64() as usize, + Some(b'A'), + ), + attachments: vec![], + }, + generate_log_data(b'a'), + ), + ( + BackupData { + backup_id: "b2".to_string(), + user_keys: generate_stable_nbytes( + ByteSize::kib(4).as_u64() as usize, + Some(b'b'), + ), + user_data: generate_stable_nbytes( + ByteSize::mib(4).as_u64() as usize, + Some(b'B'), + ), + attachments: vec![], + }, + generate_log_data(b'b'), + ), + ] +} + +fn generate_log_data(value: u8) -> Vec { + const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize; + const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize; + + (1..30) + .map(|log_id| { + let size = if log_id % 2 == 0 { + IN_DB_SIZE + } else { + IN_BLOB_SIZE + }; + let attachments = if log_id % 10 == 0 { + Some(vec![Uuid::new_v4().to_string()]) + } else { + None + }; + let mut content = generate_stable_nbytes(size, Some(value)); + let unique_suffix = log_id.to_string(); + content.extend(unique_suffix.as_bytes()); + + UploadLogRequest { + log_id, + content, + attachments, + } + }) + .collect() +}