diff --git a/services/commtest/tests/backup/backup_utils.rs b/services/commtest/tests/backup/backup_utils.rs --- a/services/commtest/tests/backup/backup_utils.rs +++ b/services/commtest/tests/backup/backup_utils.rs @@ -6,6 +6,7 @@ // stands for both, backup and log items #[allow(dead_code)] +#[derive(Clone)] pub struct Item { pub id: String, pub chunks_sizes: Vec, @@ -28,6 +29,7 @@ } #[allow(dead_code)] +#[derive(Clone)] pub struct BackupData { pub user_id: String, pub device_id: String, diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,7 +1,109 @@ +#[path = "./backup/backup_utils.rs"] +mod backup_utils; +#[path = "./backup/create_new_backup.rs"] +mod create_new_backup; #[path = "./lib/tools.rs"] mod tools; +use bytesize::ByteSize; +use std::env; +use std::sync::mpsc::channel; + +use tokio::runtime::Runtime; +use tools::Error; + +use backup_utils::{BackupData, BackupServiceClient, Item}; + #[tokio::test] -async fn backup_performance_test() { - assert!(false, "not implemented"); +async fn backup_performance_test() -> Result<(), Error> { + let port = env::var("COMM_SERVICES_PORT_BACKUP") + .expect("port env var expected but not received"); + let client = + BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; + + let number_of_threads_str: String = + env::var("COMM_NUMBER_OF_THREADS").unwrap(); + let mut number_of_threads = num_cpus::get(); + if !number_of_threads_str.is_empty() { + number_of_threads = number_of_threads_str.parse::().unwrap(); + } + + println!( + "Running performance tests for backup, number of threads: {}", + number_of_threads + ); + + let mut backup_data = vec![]; + + for i in 0..number_of_threads { + backup_data.push(BackupData { + user_id: format!("user{}", i), + device_id: format!("device{}", i), + backup_item: Item::new( + String::new(), + vec![ByteSize::mib(1).as_u64() as usize; 3 + (i % 5)], + (0..(i % 5)).map(|x| format!("holder{}", x)).collect(), + ), + log_items: (0..(i % 4)) + .map(|x| { + Item::new( + String::new(), + vec![ByteSize::mib(1).as_u64() as usize; 2 + (x % 2)], + (0..(i % 5)).map(|x| format!("holder{}-{}", i, x)).collect(), + ) + }) + .collect(), + }); + } + + let rt = Runtime::new().unwrap(); + tokio::task::spawn_blocking(move || { + // CREATE NEW BACKUP + rt.block_on(async { + println!("performing CREATE NEW BACKUP operations"); + let mut handlers = vec![]; + let (sender, receiver) = channel::<(usize, String)>(); + for (i, item) in backup_data.iter().enumerate() { + let item_cloned = item.clone(); + let mut client_cloned = client.clone(); + let sender_cloned = sender.clone(); + handlers.push(tokio::spawn(async move { + let id = create_new_backup::run(&mut client_cloned, &item_cloned) + .await + .unwrap(); + assert!( + !id.is_empty(), + "backup id should not be empty after creating a new backup" + ); + sender_cloned.send((i, id)).unwrap(); + })); + } + drop(sender); + + for handler in handlers { + handler.await.unwrap(); + } + for data in receiver { + println!("received: {:?}", data); + let (index, id) = data; + backup_data[index].backup_item.id = id; + } + }); + + for (i, item) in backup_data.iter().enumerate() { + assert!( + !item.backup_item.id.is_empty(), + "missing backup id for index {}", + i + ); + } + + // SEND LOG + // ADD ATTACHMENTS + // PULL BACKUP + }) + .await + .expect("Task panicked"); + + Ok(()) }