Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3505544
D10534.id35201.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Referenced Files
None
Subscribers
None
D10534.id35201.diff
View Options
diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs
--- a/services/commtest/tests/backup_integration_test.rs
+++ b/services/commtest/tests/backup_integration_test.rs
@@ -1,74 +1,77 @@
+use std::collections::{HashMap, HashSet};
+
use backup_client::{
BackupClient, BackupData, BackupDescriptor, Error as BackupClientError,
- RequestedData,
+ RequestedData, SinkExt, StreamExt, TryStreamExt,
};
use bytesize::ByteSize;
-use comm_lib::{auth::UserIdentity, backup::LatestBackupIDResponse};
+use comm_lib::{
+ auth::UserIdentity,
+ backup::{
+ DownloadLogsRequest, LatestBackupIDResponse, LogWSResponse,
+ UploadLogRequest,
+ },
+};
use commtest::{
service_addr,
tools::{generate_stable_nbytes, Error},
};
use reqwest::StatusCode;
+use uuid::Uuid;
#[tokio::test]
async fn backup_integration_test() -> Result<(), Error> {
let backup_client = BackupClient::new(service_addr::BACKUP_SERVICE_HTTP)?;
- let backup_datas = [
- BackupData {
- backup_id: "b1".to_string(),
- user_keys: generate_stable_nbytes(
- ByteSize::kib(4).as_u64() as usize,
- Some(b'a'),
- ),
- user_data: generate_stable_nbytes(
- ByteSize::mib(4).as_u64() as usize,
- Some(b'A'),
- ),
- attachments: vec![],
- },
- BackupData {
- backup_id: "b2".to_string(),
- user_keys: generate_stable_nbytes(
- ByteSize::kib(4).as_u64() as usize,
- Some(b'b'),
- ),
- user_data: generate_stable_nbytes(
- ByteSize::mib(4).as_u64() as usize,
- Some(b'B'),
- ),
- attachments: vec![],
- },
- ];
-
let user_identity = UserIdentity {
user_id: "1".to_string(),
access_token: "dummy access token".to_string(),
device_id: "dummy device_id".to_string(),
};
- backup_client
- .upload_backup(&user_identity, backup_datas[0].clone())
- .await?;
- backup_client
- .upload_backup(&user_identity, backup_datas[1].clone())
- .await?;
+ let backup_datas = generate_backup_data();
+
+ // Upload backups
+ for (backup_data, log_datas) in &backup_datas {
+ backup_client
+ .upload_backup(&user_identity, backup_data.clone())
+ .await?;
+
+ let (tx, rx) = backup_client
+ .upload_logs(&user_identity, &backup_data.backup_id)
+ .await
+ .unwrap();
+
+ tokio::pin!(tx);
+ tokio::pin!(rx);
+
+ for log_data in log_datas {
+ tx.send(log_data.clone()).await.unwrap();
+ }
+
+ let result: HashSet<usize> =
+ rx.take(log_datas.len()).try_collect().await.unwrap();
+ let expected = log_datas.iter().map(|data| data.log_id).collect();
+ assert_eq!(result, expected);
+ }
// Test direct lookup
+ let (backup_data, log_datas) = &backup_datas[1];
+
let second_backup_descriptor = BackupDescriptor::BackupID {
- backup_id: backup_datas[1].backup_id.clone(),
+ backup_id: backup_data.backup_id.clone(),
user_identity: user_identity.clone(),
};
let user_keys = backup_client
.download_backup_data(&second_backup_descriptor, RequestedData::UserKeys)
.await?;
- assert_eq!(user_keys, backup_datas[1].user_keys);
+ assert_eq!(user_keys, backup_data.user_keys);
let user_data = backup_client
.download_backup_data(&second_backup_descriptor, RequestedData::UserData)
.await?;
- assert_eq!(user_data, backup_datas[1].user_data);
+ assert_eq!(user_data, backup_data.user_data);
// Test latest backup lookup
let latest_backup_descriptor = BackupDescriptor::Latest {
@@ -81,16 +84,63 @@
.await?;
let response: LatestBackupIDResponse =
serde_json::from_slice(&backup_id_response)?;
- assert_eq!(response.backup_id, backup_datas[1].backup_id);
+ assert_eq!(response.backup_id, backup_data.backup_id);
let user_keys = backup_client
.download_backup_data(&latest_backup_descriptor, RequestedData::UserKeys)
.await?;
- assert_eq!(user_keys, backup_datas[1].user_keys);
+ assert_eq!(user_keys, backup_data.user_keys);
+
+ // Test log download
+ let (tx, rx) = backup_client
+ .download_logs(&user_identity, &backup_data.backup_id)
+ .await
+ .unwrap();
+
+ tokio::pin!(tx);
+ tokio::pin!(rx);
+
+ tx.send(DownloadLogsRequest { from_id: None })
+ .await
+ .unwrap();
+
+ let mut downloaded_logs = HashMap::new();
+ 'download: loop {
+ loop {
+ match rx.next().await.unwrap().unwrap() {
+ LogWSResponse::LogDownload {
+ log_id,
+ content,
+ attachments,
+ } => {
+ downloaded_logs.insert(log_id, (content, attachments));
+ }
+ LogWSResponse::LogDownloadFinished { last_log_id } => {
+ if let Some(last_log_id) = last_log_id {
+ tx.send(DownloadLogsRequest {
+ from_id: Some(last_log_id),
+ })
+ .await
+ .unwrap();
+ } else {
+ break 'download;
+ }
+ }
+ msg => panic!("Got response: {msg:?}"),
+ };
+ }
+ }
+ let expected_logs = log_datas
+ .iter()
+ .cloned()
+ .map(|data| (data.log_id, (data.content, data.attachments)))
+ .collect();
+ assert_eq!(downloaded_logs, expected_logs);
// Test cleanup
+ let (cleaned_up_backup, _) = &backup_datas[0];
let first_backup_descriptor = BackupDescriptor::BackupID {
- backup_id: backup_datas[0].backup_id.clone(),
+ backup_id: cleaned_up_backup.backup_id.clone(),
user_identity: user_identity.clone(),
};
@@ -110,3 +160,67 @@
Ok(())
}
+
+fn generate_backup_data() -> [(BackupData, Vec<UploadLogRequest>); 2] {
+ [
+ (
+ BackupData {
+ backup_id: "b1".to_string(),
+ user_keys: generate_stable_nbytes(
+ ByteSize::kib(4).as_u64() as usize,
+ Some(b'a'),
+ ),
+ user_data: generate_stable_nbytes(
+ ByteSize::mib(4).as_u64() as usize,
+ Some(b'A'),
+ ),
+ attachments: vec![],
+ },
+ generate_log_data(b'a'),
+ ),
+ (
+ BackupData {
+ backup_id: "b2".to_string(),
+ user_keys: generate_stable_nbytes(
+ ByteSize::kib(4).as_u64() as usize,
+ Some(b'b'),
+ ),
+ user_data: generate_stable_nbytes(
+ ByteSize::mib(4).as_u64() as usize,
+ Some(b'B'),
+ ),
+ attachments: vec![],
+ },
+ generate_log_data(b'b'),
+ ),
+ ]
+}
+
+fn generate_log_data(value: u8) -> Vec<UploadLogRequest> {
+ const IN_DB_SIZE: usize = ByteSize::kib(4).as_u64() as usize;
+ const IN_BLOB_SIZE: usize = ByteSize::kib(400).as_u64() as usize;
+
+ (1..30)
+ .map(|log_id| {
+ let size = if log_id % 2 == 0 {
+ IN_DB_SIZE
+ } else {
+ IN_BLOB_SIZE
+ };
+ let attachments = if log_id % 10 == 0 {
+ Some(vec![Uuid::new_v4().to_string()])
+ } else {
+ None
+ };
+ let mut content = generate_stable_nbytes(size, Some(value));
+ let unique_suffix = log_id.to_string();
+ content.extend(unique_suffix.as_bytes());
+
+ UploadLogRequest {
+ log_id,
+ content,
+ attachments,
+ }
+ })
+ .collect()
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 21, 1:45 PM (19 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2687211
Default Alt Text
D10534.id35201.diff (7 KB)
Attached To
Mode
D10534: [commtest] Check log logic in backup tests
Attached
Detach File
Event Timeline
Log In to Comment