Page MenuHomePhabricator

D4633.id15058.diff
No OneTemporary

D4633.id15058.diff

diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs
--- a/services/commtest/tests/backup_performance_test.rs
+++ b/services/commtest/tests/backup_performance_test.rs
@@ -4,6 +4,8 @@
mod backup_utils;
#[path = "./backup/create_new_backup.rs"]
mod create_new_backup;
+#[path = "./backup/send_log.rs"]
+mod send_log;
#[path = "./lib/tools.rs"]
mod tools;
@@ -75,6 +77,11 @@
sender_cloned.send((i, id)).unwrap();
}));
}
+ // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv
+ // The channel is closed when all senders have been dropped, or when close
+ // is called. The best option here is to clone the sender for every
+ // thread, drop the original one and let all the clones be dropped when
+ // going out of scope which is equal to the parent thread's termination.
drop(sender);
for handler in handlers {
@@ -118,6 +125,62 @@
});
// SEND LOG
+ rt.block_on(async {
+ println!("performing SEND LOG operations");
+ let mut handlers = vec![];
+ let (sender, receiver) = channel::<(usize, usize, String)>();
+ for (backup_index, backup_item) in backup_data.iter().enumerate() {
+ let backup_item_cloned = backup_item.clone();
+ for log_index in 0..backup_item_cloned.log_items.len() {
+ let backup_item_recloned = backup_item_cloned.clone();
+ let mut client_cloned = client.clone();
+ let sender_cloned = sender.clone();
+ handlers.push(tokio::spawn(async move {
+ println!(
+ "sending log, backup index: [{}] log index: [{}]",
+ backup_index, log_index
+ );
+ let id = send_log::run(
+ &mut client_cloned,
+ &backup_item_recloned,
+ log_index,
+ )
+ .await
+ .unwrap();
+ assert!(!id.is_empty(), "log id should not be empty after sending");
+ sender_cloned.send((backup_index, log_index, id)).unwrap();
+ }));
+ }
+ }
+ // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv
+ // The channel is closed when all senders have been dropped, or when close
+ // is called. The best option here is to clone the sender for every
+ // thread, drop the original one and let all the clones be dropped when
+ // going out of scope which is equal to the parent thread's termination.
+ drop(sender);
+
+ for handler in handlers {
+ handler.await.unwrap();
+ }
+ for data in receiver {
+ println!("received: {:?}", data);
+ let (backup_index, log_index, id) = data;
+ backup_data[backup_index].log_items[log_index].id = id;
+ }
+ });
+
+ // check if log IDs are properly set
+ for (backup_index, backup_item) in backup_data.iter().enumerate() {
+ for (log_index, log_item) in backup_item.log_items.iter().enumerate() {
+ assert!(
+ !log_item.id.is_empty(),
+ "missing log id for backup index {} and log index {}",
+ backup_index,
+ log_index
+ );
+ }
+ }
+
// ADD ATTACHMENTS
// PULL BACKUP
})

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 9:20 PM (21 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2688676
Default Alt Text
D4633.id15058.diff (3 KB)

Event Timeline