Page MenuHomePhabricator

D10534.id35201.diff
No OneTemporary

D10534.id35201.diff

diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs
--- a/services/commtest/tests/backup_integration_test.rs
+++ b/services/commtest/tests/backup_integration_test.rs
@@ -1,74 +1,77 @@
+use std::collections::{HashMap, HashSet};
+
use backup_client::{
BackupClient, BackupData, BackupDescriptor, Error as BackupClientError,
- RequestedData,
+ RequestedData, SinkExt, StreamExt, TryStreamExt,
};
use bytesize::ByteSize;
-use comm_lib::{auth::UserIdentity, backup::LatestBackupIDResponse};
+use comm_lib::{
+ auth::UserIdentity,
+ backup::{
+ DownloadLogsRequest, LatestBackupIDResponse, LogWSResponse,
+ UploadLogRequest,
+ },
+};
use commtest::{
service_addr,
tools::{generate_stable_nbytes, Error},
};
use reqwest::StatusCode;
+use uuid::Uuid;
#[tokio::test]
async fn backup_integration_test() -> Result<(), Error> {
let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?;
- let backup_datas = [
- BackupData {
- backup_id: "b1".to_string(),
- user_keys: generate_stable_nbytes(
- ByteSize::kib(4).as_u64() as usize,
- Some(b'a'),
- ),
- user_data: generate_stable_nbytes(
- ByteSize::mib(4).as_u64() as usize,
- Some(b'A'),
- ),
- attachments: vec![],
- },
- BackupData {
- backup_id: "b2".to_string(),
- user_keys: generate_stable_nbytes(
- ByteSize::kib(4).as_u64() as usize,
- Some(b'b'),
- ),
- user_data: generate_stable_nbytes(
- ByteSize::mib(4).as_u64() as usize,
- Some(b'B'),
- ),
- attachments: vec![],
- },
- ];
-
let user_identity = UserIdentity {
user_id: "1".to_string(),
access_token: "dummy access token".to_string(),
device_id: "dummy device_id".to_string(),
};
- backup_client
- .upload_backup(&user_identity, backup_datas[0].clone())
- .await?;
- backup_client
- .upload_backup(&user_identity, backup_datas[1].clone())
- .await?;
+ let backup_datas = generate_backup_data();
+
+ // Upload backups
+ for (backup_data, log_datas) in &backup_datas {
+ backup_client
+ .upload_backup(&user_identity, backup_data.clone())
+ .await?;
+
+ let (tx, rx) = backup_client
+ .upload_logs(&user_identity, &backup_data.backup_id)
+ .await
+ .unwrap();
+
+ tokio::pin!(tx);
+ tokio::pin!(rx);
+
+ for log_data in log_datas {
+ tx.send(log_data.clone()).await.unwrap();
+ }
+
+ let result: HashSet<usize> =
+ rx.take(log_datas.len()).try_collect().await.unwrap();
+ let expected = log_datas.iter().map(|data| data.log_id).collect();
+ assert_eq!(result, expected);
+ }
// Test direct lookup
+ let (backup_data, log_datas) = &backup_datas[1];
+
let second_backup_descriptor = BackupDescriptor::BackupID {
- backup_id: backup_datas[1].backup_id.clone(),
+ backup_id: backup_data.backup_id.clone(),
user_identity: user_identity.clone(),
};
let user_keys = backup_client
.download_backup_data(&second_backup_descriptor, RequestedData::UserKeys)
.await?;
- assert_eq!(user_keys, backup_datas[1].user_keys);
+ assert_eq!(user_keys, backup_data.user_keys);
let user_data = backup_client
.download_backup_data(&second_backup_descriptor, RequestedData::UserData)
.await?;
- assert_eq!(user_data, backup_datas[1].user_data);
+ assert_eq!(user_data, backup_data.user_data);
// Test latest backup lookup
let latest_backup_descriptor = BackupDescriptor::Latest {
@@ -81,16 +84,63 @@
.await?;
let response: LatestBackupIDResponse =
serde_json::from_slice(&backup_id_response)?;
- assert_eq!(response.backup_id, backup_datas[1].backup_id);
+ assert_eq!(response.backup_id, backup_data.backup_id);
let user_keys = backup_client
.download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys)
.await?;
- assert_eq!(user_keys, backup_datas[1].user_keys);
+ assert_eq!(user_keys, backup_data.user_keys);
+
+ // Test log download
+ let (tx, rx) = backup_client
+ .download_logs(&user_identity, &backup_data.backup_id)
+ .await
+ .unwrap();
+
+ tokio::pin!(tx);
+ tokio::pin!(rx);
+
+ tx.send(DownloadLogsRequest { from_id: None })
+ .await
+ .unwrap();
+
+ let mut downloaded_logs = HashMap::new();
+ 'download: loop {
+ loop {
+ match rx.next().await.unwrap().unwrap() {
+ LogWSResponse::LogDownload {
+ log_id,
+ content,
+ attachments,
+ } => {
+ downloaded_logs.insert(log_id, (content, attachments));
+ }
+ LogWSResponse::LogDownloadFinished { last_log_id } => {
+ if let Some(last_log_id) = last_log_id {
+ tx.send(DownloadLogsRequest {
+ from_id: Some(last_log_id),
+ })
+ .await
+ .unwrap();
+ } else {
+ break 'download;
+ }
+ }
+ msg => panic!("Got response: {msg:?}"),
+ };
+ }
+ }
+ let expected_logs = log_datas
+ .iter()
+ .cloned()
+ .map(|data| (data.log_id, (data.content, data.attachments)))
+ .collect();
+ assert_eq!(downloaded_logs, expected_logs);
// Test cleanup
+ let (cleaned_up_backup, _) = &backup_datas[0];
let first_backup_descriptor = BackupDescriptor::BackupID {
- backup_id: backup_datas[0].backup_id.clone(),
+ backup_id: cleaned_up_backup.backup_id.clone(),
user_identity: user_identity.clone(),
};
@@ -110,3 +160,67 @@
Ok(())
}
+
+fn generate_backup_data() -> [(BackupData, Vec<UploadLogRequest>); 2] {
+ [
+ (
+ BackupData {
+ backup_id: "b1".to_string(),
+ user_keys: generate_stable_nbytes(
+ ByteSize::kib(4).as_u64() as usize,
+ Some(b'a'),
+ ),
+ user_data: generate_stable_nbytes(
+ ByteSize::mib(4).as_u64() as usize,
+ Some(b'A'),
+ ),
+ attachments: vec![],
+ },
+ generate_log_data(b'a'),
+ ),
+ (
+ BackupData {
+ backup_id: "b2".to_string(),
+ user_keys: generate_stable_nbytes(
+ ByteSize::kib(4).as_u64() as usize,
+ Some(b'b'),
+ ),
+ user_data: generate_stable_nbytes(
+ ByteSize::mib(4).as_u64() as usize,
+ Some(b'B'),
+ ),
+ attachments: vec![],
+ },
+ generate_log_data(b'b'),
+ ),
+ ]
+}
+
+fn generate_log_data(value: u8) -> Vec<UploadLogRequest> {
+ const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize;
+ const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize;
+
+ (1..30)
+ .map(|log_id| {
+ let size = if log_id % 2 == 0 {
+ IN_DB_SIZE
+ } else {
+ IN_BLOB_SIZE
+ };
+ let attachments = if log_id % 10 == 0 {
+ Some(vec![Uuid::new_v4().to_string()])
+ } else {
+ None
+ };
+ let mut content = generate_stable_nbytes(size, Some(value));
+ let unique_suffix = log_id.to_string();
+ content.extend(unique_suffix.as_bytes());
+
+ UploadLogRequest {
+ log_id,
+ content,
+ attachments,
+ }
+ })
+ .collect()
+}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 1:45 PM (19 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687211
Default Alt Text
D10534.id35201.diff (7 KB)

Event Timeline