diff --git a/services/commtest/tests/backup/add_attachments.rs b/services/commtest/tests/backup/add_attachments.rs index 37ad8965f..df14db8d3 100644 --- a/services/commtest/tests/backup/add_attachments.rs +++ b/services/commtest/tests/backup/add_attachments.rs @@ -1,53 +1,47 @@ -#[path = "./backup_utils.rs"] -mod backup_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use crate::backup_utils::{proto::AddAttachmentsRequest, BackupServiceClient}; - -use tonic::Request; - -use crate::backup_utils::BackupData; +use crate::backup::backup_utils::{ + proto::AddAttachmentsRequest, BackupData, BackupServiceClient, +}; use crate::tools::{Error, ATTACHMENT_DELIMITER}; +use tonic::Request; // log_index = None means that we add attachments to the backup // log_index = Some(x) means that we add attachments to a specific log pub async fn run( client: &mut BackupServiceClient, backup_data: &BackupData, log_index: Option, ) -> Result<(), Error> { let cloned_user_id = backup_data.user_id.clone(); let cloned_backup_id = backup_data.backup_item.id.clone(); let log_id: String = match log_index { Some(index) => { let log_id = backup_data.log_items[index].id.clone(); println!("add attachments for log {}/{}", index, log_id); log_id } None => { println!("add attachments for backup"); String::new() } }; let holders: String = match log_index { Some(log_index) => backup_data.log_items[log_index] .attachments_holders .join(ATTACHMENT_DELIMITER), None => backup_data .backup_item .attachments_holders .join(ATTACHMENT_DELIMITER), }; client .add_attachments(Request::new(AddAttachmentsRequest { user_id: cloned_user_id, backup_id: cloned_backup_id, log_id, holders, })) .await?; Ok(()) } diff --git a/services/commtest/tests/backup/backup_utils.rs b/services/commtest/tests/backup/backup_utils.rs index b016dbaaf..159630498 100644 --- a/services/commtest/tests/backup/backup_utils.rs +++ b/services/commtest/tests/backup/backup_utils.rs @@ -1,106 +1,101 @@ pub mod proto { tonic::include_proto!("backup"); } - pub use proto::backup_service_client::BackupServiceClient; use std::collections::HashMap; // stands for both, backup and log items -#[allow(dead_code)] #[derive(Clone)] pub struct Item { pub id: String, pub chunks_sizes: Vec, pub attachments_holders: Vec, } -#[allow(dead_code)] impl Item { pub fn new( id: String, chunks_sizes: Vec, attachments_holders: Vec, ) -> Item { Item { id, chunks_sizes, attachments_holders, } } } -#[allow(dead_code)] #[derive(Clone)] pub struct BackupData { pub user_id: String, pub device_id: String, pub backup_item: Item, pub log_items: Vec, } -#[allow(dead_code)] pub fn compare_backups(backup_data: &BackupData, result: &BackupData) { // check backup size let expected: usize = backup_data.backup_item.chunks_sizes.iter().sum(); let from_result: usize = result.backup_item.chunks_sizes.iter().sum(); assert_eq!( from_result, expected, "backup sizes do not match, expected {}, got {}", expected, from_result ); // check backup attachments let expected: usize = backup_data.backup_item.attachments_holders.len(); let from_result: usize = result.backup_item.attachments_holders.len(); assert_eq!( from_result, expected, "backup: number of attachments holders do not match, expected {}, got {}", expected, from_result ); // check number of logs let expected: usize = backup_data.log_items.len(); let from_result: usize = result.log_items.len(); assert_eq!( expected, from_result, "backup id {} number of logs do not match, expected {}, got {}", backup_data.backup_item.id, expected, from_result ); // check log sizes // map let mut expected_log_map: HashMap = HashMap::new(); let mut result_log_map: HashMap = HashMap::new(); for i in 0..backup_data.log_items.len() { let expected: usize = backup_data.log_items[i].chunks_sizes.iter().sum(); let insert_result = expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); assert_eq!( insert_result, None, "expected collection contained duplicated log id: {}", backup_data.log_items[i].id ); let from_result: usize = result.log_items[i].chunks_sizes.iter().sum(); let insert_result = result_log_map.insert(result.log_items[i].id.clone(), from_result); assert_eq!( insert_result, None, "expected collection contained duplicated log id: {}", result.log_items[i].id ); } for (expected_id, expected_size) in &expected_log_map { let result_size = result_log_map.get(expected_id).expect(&format!( "comparing logs: expected id found in result: {}", expected_id )); assert_eq!( expected_size, result_size, "comparing logs, sizes don't match, backup {}", backup_data.backup_item.id ); } // todo: check logs attachment holders } diff --git a/services/commtest/tests/backup/create_new_backup.rs b/services/commtest/tests/backup/create_new_backup.rs index 531b80c16..28e2eccbf 100644 --- a/services/commtest/tests/backup/create_new_backup.rs +++ b/services/commtest/tests/backup/create_new_backup.rs @@ -1,76 +1,69 @@ -#[path = "./backup_utils.rs"] -mod backup_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use crate::backup_utils::{ +use crate::backup::backup_utils::{ proto::create_new_backup_request::Data::*, proto::CreateNewBackupRequest, BackupData, BackupServiceClient, }; - -use tonic::Request; - use crate::tools::{generate_stable_nbytes, DataHasher, Error}; +use tonic::Request; pub async fn run( client: &mut BackupServiceClient, backup_data: &BackupData, ) -> Result { println!("create new backup"); let cloned_user_id = backup_data.user_id.clone(); let cloned_device_id = backup_data.device_id.clone(); let cloned_backup_chunk_sizes = backup_data.backup_item.chunks_sizes.clone(); let predefined_byte_value = None; let outbound = async_stream::stream! { println!(" - sending user id"); let request = CreateNewBackupRequest { data: Some(UserId(cloned_user_id)), }; yield request; println!(" - sending device id"); let request = CreateNewBackupRequest { data: Some(DeviceId(cloned_device_id)), }; yield request; println!(" - sending key entropy"); let request = CreateNewBackupRequest { data: Some(KeyEntropy(vec![65,66,67,68])), }; yield request; println!(" - sending data hash"); let mut hasher = DataHasher::new(); for chunk_size in &cloned_backup_chunk_sizes { DataHasher::update(&mut hasher, generate_stable_nbytes(*chunk_size, predefined_byte_value)); } let request = CreateNewBackupRequest { data: Some(NewCompactionHash(hasher.get_hash().as_bytes().to_vec())), }; yield request; for chunk_size in &cloned_backup_chunk_sizes { println!(" - sending data chunk {}", chunk_size); let request = CreateNewBackupRequest { data: Some(NewCompactionChunk(generate_stable_nbytes(*chunk_size, predefined_byte_value))), }; yield request; } }; let mut backup_id: String = String::new(); let response = client.create_new_backup(Request::new(outbound)).await?; let mut inbound = response.into_inner(); while let Some(response) = inbound.message().await? { if !response.backup_id.is_empty() { assert!( backup_id.is_empty(), "backup id should be returned only once" ); backup_id = response.backup_id; } } assert!( !backup_id.is_empty(), "could not get a backup id from the server" ); Ok(backup_id) } diff --git a/services/commtest/tests/backup/mod.rs b/services/commtest/tests/backup/mod.rs new file mode 100644 index 000000000..e80c13acc --- /dev/null +++ b/services/commtest/tests/backup/mod.rs @@ -0,0 +1,5 @@ +pub mod add_attachments; +pub mod backup_utils; +pub mod create_new_backup; +pub mod pull_backup; +pub mod send_log; diff --git a/services/commtest/tests/backup/pull_backup.rs b/services/commtest/tests/backup/pull_backup.rs index 9a4ed6c9c..1e8866cf4 100644 --- a/services/commtest/tests/backup/pull_backup.rs +++ b/services/commtest/tests/backup/pull_backup.rs @@ -1,137 +1,129 @@ -#[path = "./backup_utils.rs"] -mod backup_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use std::io::{Error as IOError, ErrorKind}; -use tonic::Request; - -use crate::backup_utils::{ +use crate::backup::backup_utils::{ proto::pull_backup_response::Data, proto::pull_backup_response::Data::*, proto::pull_backup_response::Id, proto::pull_backup_response::Id::*, - proto::PullBackupRequest, BackupServiceClient, + proto::PullBackupRequest, BackupData, BackupServiceClient, Item, }; - -use crate::backup_utils::{BackupData, Item}; use crate::tools::{Error, ATTACHMENT_DELIMITER}; +use std::io::{Error as IOError, ErrorKind}; +use tonic::Request; #[derive(PartialEq, Debug)] enum State { Compaction, Log, } pub async fn run( client: &mut BackupServiceClient, backup_data: &BackupData, ) -> Result { println!("pull backup"); let cloned_user_id = backup_data.user_id.clone(); let cloned_backup_id = backup_data.backup_item.id.clone(); let mut result = BackupData { user_id: String::new(), device_id: String::new(), backup_item: Item::new(String::new(), Vec::new(), Vec::new()), log_items: Vec::new(), }; let response = client .pull_backup(Request::new(PullBackupRequest { user_id: cloned_user_id, backup_id: cloned_backup_id, })) .await?; let mut inbound = response.into_inner(); let mut state: State = State::Compaction; let mut current_id: String = String::new(); while let Some(response) = inbound.message().await? { let response_data: Option = response.data; let id: Option = response.id; let mut backup_id: Option = None; let mut log_id: Option = None; match id { Some(BackupId(id)) => backup_id = Some(id), Some(LogId(id)) => log_id = Some(id), None => {} }; match response_data { Some(CompactionChunk(chunk)) => { assert_eq!( state, State::Compaction, "invalid state, expected compaction, got {:?}", state ); current_id = backup_id.ok_or(IOError::new( ErrorKind::Other, "backup id expected but not received", ))?; println!( "compaction (id {}), pulling chunk (size: {})", current_id, chunk.len() ); result.backup_item.chunks_sizes.push(chunk.len()) } Some(LogChunk(chunk)) => { if state == State::Compaction { state = State::Log; } assert_eq!( state, State::Log, "invalid state, expected log, got {:?}", state ); let log_id = log_id.ok_or(IOError::new( ErrorKind::Other, "log id expected but not received", ))?; if log_id != current_id { result.log_items.push(Item::new( log_id.clone(), Vec::new(), Vec::new(), )); current_id = log_id; } let log_items_size = result.log_items.len() - 1; result.log_items[log_items_size] .chunks_sizes .push(chunk.len()); println!("log (id {}) chunk size {}", current_id, chunk.len()); } None => {} } if let Some(holders) = response.attachment_holders { let holders_split: Vec<&str> = holders.split(ATTACHMENT_DELIMITER).collect(); if state == State::Compaction { for holder in holders_split { if holder.len() == 0 { continue; } println!("attachments for the backup: {}", holder); result .backup_item .attachments_holders .push(holder.to_string()); } } else if state == State::Log { println!("attachments for the log {:?}: {}", current_id, holders); for holder in holders_split { if holder.len() == 0 { continue; } let log_items_size = result.log_items.len() - 1; result.log_items[log_items_size] .attachments_holders .push(holder.to_string()) } } } } Ok(result) } diff --git a/services/commtest/tests/backup/send_log.rs b/services/commtest/tests/backup/send_log.rs index 666da8d72..ca70a51f2 100644 --- a/services/commtest/tests/backup/send_log.rs +++ b/services/commtest/tests/backup/send_log.rs @@ -1,60 +1,53 @@ -#[path = "./backup_utils.rs"] -mod backup_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use crate::backup_utils::{ - proto::send_log_request::Data::*, proto::SendLogRequest, BackupServiceClient, +use crate::backup::backup_utils::{ + proto::{send_log_request::Data::*, SendLogRequest}, + BackupData, BackupServiceClient, }; - -use tonic::Request; - -use crate::backup_utils::BackupData; use crate::tools::{generate_stable_nbytes, DataHasher, Error}; +use tonic::Request; pub async fn run( client: &mut BackupServiceClient, backup_data: &BackupData, log_index: usize, ) -> Result { println!("send log"); let cloned_user_id = backup_data.user_id.clone(); let cloned_backup_id = backup_data.backup_item.id.clone(); let cloned_log_sizes = backup_data.log_items[log_index].chunks_sizes.clone(); let predefined_byte_value = None; let outbound = async_stream::stream! { println!(" - sending user id"); let request = SendLogRequest { data: Some(UserId(cloned_user_id)), }; yield request; println!(" - sending backup id"); let request = SendLogRequest { data: Some(BackupId(cloned_backup_id)), }; yield request; println!(" - sending log hash"); let mut hasher = DataHasher::new(); for chunk_size in &cloned_log_sizes { DataHasher::update(&mut hasher, generate_stable_nbytes(*chunk_size, predefined_byte_value)); } let request = SendLogRequest { data: Some(LogHash(hasher.get_hash().as_bytes().to_vec())), }; yield request; println!(" - sending log data"); for log_size in &cloned_log_sizes { println!(" - sending log data {}", *log_size); let request = SendLogRequest { data: Some(LogData(generate_stable_nbytes(*log_size, predefined_byte_value))), }; yield request; } }; let response = client.send_log(Request::new(outbound)).await?; let inbound = response.into_inner(); println!("send log response: {:?}", inbound.log_checkpoint); Ok(inbound.log_checkpoint) } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 80d9decbc..9fbf1b17e 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,134 +1,125 @@ -#[path = "./backup/add_attachments.rs"] -mod add_attachments; -#[path = "./backup/backup_utils.rs"] -mod backup_utils; -#[path = "./backup/create_new_backup.rs"] -mod create_new_backup; -#[path = "./backup/pull_backup.rs"] -mod pull_backup; -#[path = "./backup/send_log.rs"] -mod send_log; -#[path = "./lib/tools.rs"] +mod backup; mod tools; -use backup_utils::{BackupData, Item}; +use backup::{ + add_attachments, + backup_utils::{self, BackupData, BackupServiceClient, Item}, + create_new_backup, pull_backup, send_log, +}; use bytesize::ByteSize; use std::collections::HashMap; -use tools::Error; - use std::env; - -use backup_utils::BackupServiceClient; +use tools::Error; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BACKUP") .expect("port env var expected but not received"); let mut client = BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; let attachments_fill_size: u64 = 500; let mut backup_data = BackupData { user_id: "user0000".to_string(), device_id: "device0000".to_string(), backup_item: Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 6], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), log_items: vec![ // the item that almost hits the DB limit, we're going to later add a long // list of attachments, so that causes it to exceed the limit. // In this case its data should be moved to the S3 Item::new( String::new(), vec![ *tools::DYNAMO_DB_ITEM_SIZE_LIMIT - ByteSize::b(attachments_fill_size / 2).as_u64() as usize, ], vec!["holder0".to_string(), "holder1".to_string()], ), // just a small item Item::new( String::new(), vec![ByteSize::b(100).as_u64() as usize], vec!["holder0".to_string()], ), // a big item that should be placed in the S3 right away Item::new( String::new(), vec![*tools::GRPC_CHUNK_SIZE_LIMIT, *tools::GRPC_CHUNK_SIZE_LIMIT], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), ], }; backup_data.backup_item.id = create_new_backup::run(&mut client, &backup_data).await?; println!("backup id in main: {}", backup_data.backup_item.id); add_attachments::run(&mut client, &backup_data, None).await?; for log_index in 0..backup_data.log_items.len() { backup_data.log_items[log_index].id = send_log::run(&mut client, &backup_data, log_index).await?; add_attachments::run(&mut client, &backup_data, Some(log_index)).await?; } let result: BackupData = pull_backup::run(&mut client, &backup_data).await?; backup_utils::compare_backups(&backup_data, &result); // push so many attachments that the log item's data will have to be moved // from the db to the s3 let mut attachments_size = 0; let mut i = backup_data.log_items[0].attachments_holders.len(); let mut new_attachments: Vec = Vec::new(); while attachments_size < (attachments_fill_size as usize) { let att = format!("holder{}", i); attachments_size += att.len(); new_attachments.push(att); i += 1; } let mut old_attachments = backup_data.log_items[0].attachments_holders.clone(); backup_data.log_items[0].attachments_holders = new_attachments; add_attachments::run(&mut client, &backup_data, Some(0)).await?; backup_data.log_items[0] .attachments_holders .append(&mut old_attachments); let result = pull_backup::run(&mut client, &backup_data).await?; // check logs attachments // map let mut expected_log_map: HashMap = HashMap::new(); let mut result_log_map: HashMap = HashMap::new(); for i in 0..backup_data.log_items.len() { let expected: usize = backup_data.log_items[i].attachments_holders.len(); expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); let from_result: usize = result.log_items[i].attachments_holders.len(); result_log_map.insert(result.log_items[i].id.clone(), from_result); } for (expected_id, expected_size) in &expected_log_map { let result_size = result_log_map.get(expected_id).expect(&format!( "comparing logs attachments: expected id found in result: {}", expected_id )); assert_eq!( expected_size, result_size, "comparing logs attachments, sizes don't match, backup {}", backup_data.backup_item.id ); } Ok(()) } diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs index 3ed10544d..430957928 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,242 +1,234 @@ -#[path = "./backup/add_attachments.rs"] -mod add_attachments; -#[path = "./backup/backup_utils.rs"] -mod backup_utils; -#[path = "./backup/create_new_backup.rs"] -mod create_new_backup; -#[path = "./backup/pull_backup.rs"] -mod pull_backup; -#[path = "./backup/send_log.rs"] -mod send_log; -#[path = "./lib/tools.rs"] +mod backup; mod tools; +use backup::{ + add_attachments, + backup_utils::{self, BackupData, BackupServiceClient, Item}, + create_new_backup, pull_backup, send_log, +}; use bytesize::ByteSize; use std::env; use std::sync::mpsc::channel; - use tokio::runtime::Runtime; use tools::{obtain_number_of_threads, Error}; -use backup_utils::{BackupData, BackupServiceClient, Item}; - #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BACKUP") .expect("port env var expected but not received"); let client = BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for backup, number of threads: {}", number_of_threads ); let mut backup_data = vec![]; for i in 0..number_of_threads { backup_data.push(BackupData { user_id: format!("user{}", i), device_id: format!("device{}", i), backup_item: Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 3 + (i % 5)], (0..(i % 5)).map(|x| format!("holder{}", x)).collect(), ), log_items: (0..(i % 4)) .map(|x| { Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 2 + (x % 2)], (0..(i % 5)).map(|x| format!("holder{}-{}", i, x)).collect(), ) }) .collect(), }); } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // CREATE NEW BACKUP rt.block_on(async { println!("performing CREATE NEW BACKUP operations"); let mut handlers = vec![]; let (sender, receiver) = channel::<(usize, String)>(); for (i, item) in backup_data.iter().enumerate() { let item_cloned = item.clone(); let mut client_cloned = client.clone(); let sender_cloned = sender.clone(); handlers.push(tokio::spawn(async move { let id = create_new_backup::run(&mut client_cloned, &item_cloned) .await .unwrap(); assert!( !id.is_empty(), "backup id should not be empty after creating a new backup" ); sender_cloned.send((i, id)).unwrap(); })); } // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv // The channel is closed when all senders have been dropped, or when close // is called. The best option here is to clone the sender for every // thread, drop the original one and let all the clones be dropped when // going out of scope which is equal to the parent thread's termination. drop(sender); for handler in handlers { handler.await.unwrap(); } for data in receiver { println!("received: {:?}", data); let (index, id) = data; backup_data[index].backup_item.id = id; } }); // check if backup IDs are properly set for (i, item) in backup_data.iter().enumerate() { assert!( !item.backup_item.id.is_empty(), "missing backup id for index {}", i ); } // ADD ATTACHMENTS - BACKUPS rt.block_on(async { println!("performing ADD ATTACHMENTS - BACKUPS operations"); let mut handlers = vec![]; for item in &backup_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { if !item_cloned.backup_item.attachments_holders.is_empty() { add_attachments::run(&mut client_cloned, &item_cloned, None) .await .unwrap(); } })); } for handler in handlers { handler.await.unwrap(); } }); // SEND LOG rt.block_on(async { println!("performing SEND LOG operations"); let mut handlers = vec![]; let (sender, receiver) = channel::<(usize, usize, String)>(); for (backup_index, backup_item) in backup_data.iter().enumerate() { let backup_item_cloned = backup_item.clone(); for log_index in 0..backup_item_cloned.log_items.len() { let backup_item_recloned = backup_item_cloned.clone(); let mut client_cloned = client.clone(); let sender_cloned = sender.clone(); handlers.push(tokio::spawn(async move { println!( "sending log, backup index: [{}] log index: [{}]", backup_index, log_index ); let id = send_log::run( &mut client_cloned, &backup_item_recloned, log_index, ) .await .unwrap(); assert!(!id.is_empty(), "log id should not be empty after sending"); sender_cloned.send((backup_index, log_index, id)).unwrap(); })); } } // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv // The channel is closed when all senders have been dropped, or when close // is called. The best option here is to clone the sender for every // thread, drop the original one and let all the clones be dropped when // going out of scope which is equal to the parent thread's termination. drop(sender); for handler in handlers { handler.await.unwrap(); } for data in receiver { println!("received: {:?}", data); let (backup_index, log_index, id) = data; backup_data[backup_index].log_items[log_index].id = id; } }); // check if log IDs are properly set for (backup_index, backup_item) in backup_data.iter().enumerate() { for (log_index, log_item) in backup_item.log_items.iter().enumerate() { assert!( !log_item.id.is_empty(), "missing log id for backup index {} and log index {}", backup_index, log_index ); } } // ADD ATTACHMENTS - LOGS rt.block_on(async { println!("performing ADD ATTACHMENTS - LOGS operations"); let mut handlers = vec![]; for backup_item in &backup_data { let backup_item_cloned = backup_item.clone(); for log_index in 0..backup_item_cloned.log_items.len() { let backup_item_recloned = backup_item_cloned.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { if !backup_item_recloned .backup_item .attachments_holders .is_empty() { add_attachments::run( &mut client_cloned, &backup_item_recloned, Some(log_index), ) .await .unwrap(); } })); } } for handler in handlers { handler.await.unwrap(); } }); // PULL BACKUP rt.block_on(async { println!("performing PULL BACKUP operations"); let mut handlers = vec![]; for item in backup_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let result = pull_backup::run(&mut client_cloned, &item_cloned) .await .unwrap(); backup_utils::compare_backups(&item_cloned, &result); })); } for handler in handlers { handler.await.unwrap(); } }); }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/blob/blob_utils.rs b/services/commtest/tests/blob/blob_utils.rs index d5641b12c..ce7ff6998 100644 --- a/services/commtest/tests/blob/blob_utils.rs +++ b/services/commtest/tests/blob/blob_utils.rs @@ -1,13 +1,12 @@ pub mod proto { tonic::include_proto!("blob"); } pub use proto::blob_service_client::BlobServiceClient; -#[allow(dead_code)] #[derive(Clone)] pub struct BlobData { pub holder: String, pub hash: String, pub chunks_sizes: Vec, } diff --git a/services/commtest/tests/blob/get.rs b/services/commtest/tests/blob/get.rs index 7799b0f85..8a086d2b7 100644 --- a/services/commtest/tests/blob/get.rs +++ b/services/commtest/tests/blob/get.rs @@ -1,29 +1,23 @@ -#[path = "./blob_utils.rs"] -mod blob_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use tonic::Request; - -use crate::blob_utils::{proto::GetRequest, BlobData, BlobServiceClient}; +use crate::blob::blob_utils::{proto::GetRequest, BlobData, BlobServiceClient}; use crate::tools::Error; +use tonic::Request; pub async fn run( client: &mut BlobServiceClient, blob_data: &BlobData, ) -> Result, Error> { let cloned_holder = blob_data.holder.clone(); println!("[{}] get", cloned_holder); let response = client .get(Request::new(GetRequest { holder: cloned_holder, })) .await?; let mut inbound = response.into_inner(); let mut sizes: Vec = Vec::new(); while let Some(response) = inbound.message().await? { sizes.push(response.data_chunk.len()); } Ok(sizes) } diff --git a/services/commtest/tests/blob/mod.rs b/services/commtest/tests/blob/mod.rs new file mode 100644 index 000000000..ef17ccfa2 --- /dev/null +++ b/services/commtest/tests/blob/mod.rs @@ -0,0 +1,4 @@ +pub mod blob_utils; +pub mod get; +pub mod put; +pub mod remove; diff --git a/services/commtest/tests/blob/put.rs b/services/commtest/tests/blob/put.rs index fb684f208..de2350e31 100644 --- a/services/commtest/tests/blob/put.rs +++ b/services/commtest/tests/blob/put.rs @@ -1,50 +1,43 @@ -#[path = "./blob_utils.rs"] -mod blob_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use crate::blob_utils::{ +use crate::blob::blob_utils::{ proto::put_request::Data::*, proto::PutRequest, BlobData, BlobServiceClient, }; - -use tonic::Request; - use crate::tools::{generate_stable_nbytes, Error}; +use tonic::Request; pub async fn run( client: &mut BlobServiceClient, blob_data: &BlobData, ) -> Result { let cloned_holder = blob_data.holder.clone(); let cloned_hash = blob_data.hash.clone(); let cloned_chunks_sizes = blob_data.chunks_sizes.clone(); println!("[{}] put", cloned_holder); let outbound = async_stream::stream! { println!("[{}] - sending holder", cloned_holder); let request = PutRequest { data: Some(Holder(cloned_holder.to_string())), }; yield request; println!("[{}] - sending hash", cloned_holder); let request = PutRequest { data: Some(BlobHash(cloned_hash.to_string())), }; yield request; for chunk_size in cloned_chunks_sizes { println!("[{}] - sending data chunk {}", cloned_holder, chunk_size); let request = PutRequest { data: Some(DataChunk(generate_stable_nbytes(chunk_size, None))), }; yield request; } }; let mut data_exists: bool = false; let response = client.put(Request::new(outbound)).await?; let mut inbound = response.into_inner(); while let Some(response) = inbound.message().await? { data_exists = data_exists || response.data_exists; } Ok(data_exists) } diff --git a/services/commtest/tests/blob/remove.rs b/services/commtest/tests/blob/remove.rs index fd4a60b68..62d661bb9 100644 --- a/services/commtest/tests/blob/remove.rs +++ b/services/commtest/tests/blob/remove.rs @@ -1,24 +1,20 @@ -#[path = "./blob_utils.rs"] -mod blob_utils; -#[path = "../lib/tools.rs"] -mod tools; - -use tonic::Request; - -use crate::blob_utils::{proto::RemoveRequest, BlobData, BlobServiceClient}; +use crate::blob::blob_utils::{ + proto::RemoveRequest, BlobData, BlobServiceClient, +}; use crate::tools::Error; +use tonic::Request; pub async fn run( client: &mut BlobServiceClient, blob_data: &BlobData, ) -> Result<(), Error> { let cloned_holder = blob_data.holder.clone(); println!("[{}] remove", cloned_holder); client .remove(Request::new(RemoveRequest { holder: cloned_holder, })) .await?; Ok(()) } diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs index bff9056d7..5aeb1ce67 100644 --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,80 +1,74 @@ -#[path = "./blob/blob_utils.rs"] -mod blob_utils; -#[path = "./blob/get.rs"] -mod get; -#[path = "./blob/put.rs"] -mod put; -#[path = "./blob/remove.rs"] -mod remove; -#[path = "./lib/tools.rs"] +mod blob; mod tools; +use blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, +}; use bytesize::ByteSize; use std::env; - -use blob_utils::{BlobData, BlobServiceClient}; use tools::Error; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") .expect("port env var expected but not received"); let mut client = BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; let blob_data = vec![ BlobData { holder: "test_holder001".to_string(), hash: "test_hash001".to_string(), chunks_sizes: vec![ ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ], }, BlobData { holder: "test_holder002".to_string(), hash: "test_hash002".to_string(), chunks_sizes: vec![ *tools::GRPC_CHUNK_SIZE_LIMIT, *tools::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(10).as_u64() as usize, ], }, BlobData { holder: "test_holder003".to_string(), hash: "test_hash003".to_string(), chunks_sizes: vec![ *tools::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(100).as_u64() as usize, *tools::GRPC_CHUNK_SIZE_LIMIT, ], }, ]; for item in &blob_data { let data_exists: bool = put::run(&mut client, &item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { let received_sizes = get::run(&mut client, &blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); } for item in &blob_data { remove::run(&mut client, &item).await?; assert!( get::run(&mut client, &item).await.is_err(), "item should no longer be available" ); } Ok(()) } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs index 9e87f184c..e232fb5cb 100644 --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,123 +1,116 @@ -#[path = "./blob/blob_utils.rs"] -mod blob_utils; -#[path = "./blob/get.rs"] -mod get; -#[path = "./blob/put.rs"] -mod put; -#[path = "./blob/remove.rs"] -mod remove; -#[path = "./lib/tools.rs"] +mod blob; mod tools; +use blob::{ + blob_utils::{BlobData, BlobServiceClient}, + get, put, remove, +}; use bytesize::ByteSize; use std::env; - use tokio::runtime::Runtime; use tools::{obtain_number_of_threads, Error}; -use blob_utils::{BlobData, BlobServiceClient}; - #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") .expect("port env var expected but not received"); let client = BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for blob, number of threads: {}", number_of_threads ); let mut blob_data = vec![]; for i in 0..number_of_threads { let index: u64 = (i as u64) % 10; blob_data.push(BlobData { holder: format!("test_holder_{}", i), hash: format!("test_hash_{}", i), chunks_sizes: vec![ ByteSize::kib(200 + (300 - index * 20)).as_u64() as usize, ByteSize::kib(500 + (400 - index * 20)).as_u64() as usize, ByteSize::kib(700 + (500 - index * 25)).as_u64() as usize, ], }) } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // PUT rt.block_on(async { println!("performing PUT operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let data_exists: bool = put::run(&mut client_cloned, &item_cloned).await.unwrap(); assert!(!data_exists, "test data should not exist"); })); } for handler in handlers { handler.await.unwrap(); } }); // GET rt.block_on(async { println!("performing GET operations"); let mut handlers = vec![]; for (i, item) in blob_data.iter().enumerate() { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let received_sizes = get::run(&mut client_cloned, &item_cloned).await.unwrap(); let expected_data_size = item_cloned.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); })); } for handler in handlers { handler.await.unwrap(); } }); // REMOVE rt.block_on(async { println!("performing REMOVE operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { remove::run(&mut client_cloned, &item_cloned).await.unwrap(); assert!( get::run(&mut client_cloned, &item_cloned).await.is_err(), "item should no longer be available" ); })); } for handler in handlers { handler.await.unwrap(); } }); }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/lib/tools.rs b/services/commtest/tests/tools/mod.rs similarity index 93% rename from services/commtest/tests/lib/tools.rs rename to services/commtest/tests/tools/mod.rs index 92a802234..e705f0cb5 100644 --- a/services/commtest/tests/lib/tools.rs +++ b/services/commtest/tests/tools/mod.rs @@ -1,72 +1,68 @@ +#![allow(dead_code)] + use bytesize::ByteSize; +use hex::ToHex; use lazy_static::lazy_static; use num_cpus; -use std::env; - -use hex::ToHex; use sha2::{Digest, Sha512}; +use std::env; -#[allow(dead_code)] pub fn generate_stable_nbytes( number_of_bytes: usize, predefined_byte_value: Option, ) -> Vec { let byte_value = predefined_byte_value.unwrap_or(b'A'); return vec![byte_value; number_of_bytes]; } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] Proto(std::io::Error), #[display(...)] Tonic(tonic::transport::Error), #[display(...)] TonicStatus(tonic::Status), } pub const GRPC_METADATA_SIZE_BYTES: usize = 5; lazy_static! { pub static ref DYNAMO_DB_ITEM_SIZE_LIMIT: usize = ByteSize::kib(400).as_u64() as usize; pub static ref GRPC_CHUNK_SIZE_LIMIT: usize = (ByteSize::mib(4).as_u64() as usize) - GRPC_METADATA_SIZE_BYTES; } -#[allow(dead_code)] pub const ATTACHMENT_DELIMITER: &str = ";"; -#[allow(dead_code)] pub fn obtain_number_of_threads() -> usize { let number_of_threads_str: String = env::var("COMM_NUMBER_OF_THREADS").unwrap(); if number_of_threads_str.is_empty() { return num_cpus::get(); } return number_of_threads_str.parse::().unwrap(); } -#[allow(dead_code)] pub struct DataHasher { hasher: Sha512, } -#[allow(dead_code)] impl DataHasher { pub fn new() -> DataHasher { return DataHasher { hasher: Sha512::new(), }; } pub fn update(data_hasher: &mut DataHasher, bytes: Vec) { data_hasher.hasher.update(bytes); } pub fn get_hash(self) -> String { let hash = self.hasher.finalize(); return hash.encode_hex::(); } } diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs index d4bd4e5d0..ee7a750ba 100644 --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_test.rs @@ -1,7 +1,6 @@ -#[path = "./lib/tools.rs"] mod tools; #[tokio::test] async fn tunnelbroker_integration_test() { assert!(false, "not implemented"); }