diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -4,6 +4,8 @@ mod backup_utils; #[path = "./backup/create_new_backup.rs"] mod create_new_backup; +#[path = "./backup/send_log.rs"] +mod send_log; #[path = "./lib/tools.rs"] mod tools; @@ -75,6 +77,11 @@ sender_cloned.send((i, id)).unwrap(); })); } + // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv + // The channel is closed when all senders have been dropped, or when close + // is called. The best option here is to clone the sender for every + // thread, drop the original one and let all the clones be dropped when + // going out of scope which is equal to the parent thread's termination. drop(sender); for handler in handlers { @@ -118,6 +125,62 @@ }); // SEND LOG + rt.block_on(async { + println!("performing SEND LOG operations"); + let mut handlers = vec![]; + let (sender, receiver) = channel::<(usize, usize, String)>(); + for (backup_index, backup_item) in backup_data.iter().enumerate() { + let backup_item_cloned = backup_item.clone(); + for log_index in 0..backup_item_cloned.log_items.len() { + let backup_item_recloned = backup_item_cloned.clone(); + let mut client_cloned = client.clone(); + let sender_cloned = sender.clone(); + handlers.push(tokio::spawn(async move { + println!( + "sending log, backup index: [{}] log index: [{}]", + backup_index, log_index + ); + let id = send_log::run( + &mut client_cloned, + &backup_item_recloned, + log_index, + ) + .await + .unwrap(); + assert!(!id.is_empty(), "log id should not be empty after sending"); + sender_cloned.send((backup_index, log_index, id)).unwrap(); + })); + } + } + // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv + // The channel is closed when all senders have been dropped, or when close + // is called. The best option here is to clone the sender for every + // thread, drop the original one and let all the clones be dropped when + // going out of scope which is equal to the parent thread's termination. + drop(sender); + + for handler in handlers { + handler.await.unwrap(); + } + for data in receiver { + println!("received: {:?}", data); + let (backup_index, log_index, id) = data; + backup_data[backup_index].log_items[log_index].id = id; + } + }); + + // check if log IDs are properly set + for (backup_index, backup_item) in backup_data.iter().enumerate() { + for (log_index, log_item) in backup_item.log_items.iter().enumerate() { + assert!( + !log_item.id.is_empty(), + "missing log id for backup index {} and log index {}", + backup_index, + log_index + ); + } + } + // ADD ATTACHMENTS // PULL BACKUP })