diff --git a/services/commtest/tests/backup/add_attachments.rs b/services/commtest/src/backup/add_attachments.rs similarity index 100% rename from services/commtest/tests/backup/add_attachments.rs rename to services/commtest/src/backup/add_attachments.rs diff --git a/services/commtest/tests/backup/backup_utils.rs b/services/commtest/src/backup/backup_utils.rs similarity index 100% rename from services/commtest/tests/backup/backup_utils.rs rename to services/commtest/src/backup/backup_utils.rs diff --git a/services/commtest/tests/backup/create_new_backup.rs b/services/commtest/src/backup/create_new_backup.rs similarity index 100% rename from services/commtest/tests/backup/create_new_backup.rs rename to services/commtest/src/backup/create_new_backup.rs diff --git a/services/commtest/tests/backup/mod.rs b/services/commtest/src/backup/mod.rs similarity index 100% rename from services/commtest/tests/backup/mod.rs rename to services/commtest/src/backup/mod.rs diff --git a/services/commtest/tests/backup/pull_backup.rs b/services/commtest/src/backup/pull_backup.rs similarity index 100% rename from services/commtest/tests/backup/pull_backup.rs rename to services/commtest/src/backup/pull_backup.rs diff --git a/services/commtest/tests/backup/send_log.rs b/services/commtest/src/backup/send_log.rs similarity index 100% rename from services/commtest/tests/backup/send_log.rs rename to services/commtest/src/backup/send_log.rs diff --git a/services/commtest/tests/blob/blob_utils.rs b/services/commtest/src/blob/blob_utils.rs similarity index 100% rename from services/commtest/tests/blob/blob_utils.rs rename to services/commtest/src/blob/blob_utils.rs diff --git a/services/commtest/tests/blob/get.rs b/services/commtest/src/blob/get.rs similarity index 100% rename from services/commtest/tests/blob/get.rs rename to services/commtest/src/blob/get.rs diff --git a/services/commtest/tests/blob/mod.rs b/services/commtest/src/blob/mod.rs similarity index 100% rename from services/commtest/tests/blob/mod.rs rename to services/commtest/src/blob/mod.rs diff --git a/services/commtest/tests/blob/put.rs b/services/commtest/src/blob/put.rs similarity index 100% rename from services/commtest/tests/blob/put.rs rename to services/commtest/src/blob/put.rs diff --git a/services/commtest/tests/blob/remove.rs b/services/commtest/src/blob/remove.rs similarity index 100% rename from services/commtest/tests/blob/remove.rs rename to services/commtest/src/blob/remove.rs diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs new file mode 100644 index 000000000..d58cc0f78 --- /dev/null +++ b/services/commtest/src/lib.rs @@ -0,0 +1,3 @@ +pub mod backup; +pub mod blob; +pub mod tools; diff --git a/services/commtest/src/main.rs b/services/commtest/src/main.rs deleted file mode 100644 index 8ecbb9046..000000000 --- a/services/commtest/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - unimplemented!(); -} diff --git a/services/commtest/tests/tools/mod.rs b/services/commtest/src/tools.rs similarity index 98% rename from services/commtest/tests/tools/mod.rs rename to services/commtest/src/tools.rs index e705f0cb5..fe0d76490 100644 --- a/services/commtest/tests/tools/mod.rs +++ b/services/commtest/src/tools.rs @@ -1,68 +1,66 @@ -#![allow(dead_code)] - use bytesize::ByteSize; use hex::ToHex; use lazy_static::lazy_static; use num_cpus; use sha2::{Digest, Sha512}; use std::env; pub fn generate_stable_nbytes( number_of_bytes: usize, predefined_byte_value: Option, ) -> Vec { let byte_value = predefined_byte_value.unwrap_or(b'A'); return vec![byte_value; number_of_bytes]; } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] Proto(std::io::Error), #[display(...)] Tonic(tonic::transport::Error), #[display(...)] TonicStatus(tonic::Status), } pub const GRPC_METADATA_SIZE_BYTES: usize = 5; lazy_static! { pub static ref DYNAMO_DB_ITEM_SIZE_LIMIT: usize = ByteSize::kib(400).as_u64() as usize; pub static ref GRPC_CHUNK_SIZE_LIMIT: usize = (ByteSize::mib(4).as_u64() as usize) - GRPC_METADATA_SIZE_BYTES; } pub const ATTACHMENT_DELIMITER: &str = ";"; pub fn obtain_number_of_threads() -> usize { let number_of_threads_str: String = env::var("COMM_NUMBER_OF_THREADS").unwrap(); if number_of_threads_str.is_empty() { return num_cpus::get(); } return number_of_threads_str.parse::().unwrap(); } pub struct DataHasher { hasher: Sha512, } impl DataHasher { pub fn new() -> DataHasher { return DataHasher { hasher: Sha512::new(), }; } pub fn update(data_hasher: &mut DataHasher, bytes: Vec) { data_hasher.hasher.update(bytes); } pub fn get_hash(self) -> String { let hash = self.hasher.finalize(); return hash.encode_hex::(); } } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 9fbf1b17e..6a0ef7efb 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,125 +1,122 @@ -mod backup; -mod tools; - -use backup::{ +use bytesize::ByteSize; +use commtest::backup::{ add_attachments, backup_utils::{self, BackupData, BackupServiceClient, Item}, create_new_backup, pull_backup, send_log, }; -use bytesize::ByteSize; +use commtest::tools::{self, Error}; use std::collections::HashMap; use std::env; -use tools::Error; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BACKUP") .expect("port env var expected but not received"); let mut client = BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; let attachments_fill_size: u64 = 500; let mut backup_data = BackupData { user_id: "user0000".to_string(), device_id: "device0000".to_string(), backup_item: Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 6], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), log_items: vec![ // the item that almost hits the DB limit, we're going to later add a long // list of attachments, so that causes it to exceed the limit. // In this case its data should be moved to the S3 Item::new( String::new(), vec![ *tools::DYNAMO_DB_ITEM_SIZE_LIMIT - ByteSize::b(attachments_fill_size / 2).as_u64() as usize, ], vec!["holder0".to_string(), "holder1".to_string()], ), // just a small item Item::new( String::new(), vec![ByteSize::b(100).as_u64() as usize], vec!["holder0".to_string()], ), // a big item that should be placed in the S3 right away Item::new( String::new(), vec![*tools::GRPC_CHUNK_SIZE_LIMIT, *tools::GRPC_CHUNK_SIZE_LIMIT], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), ], }; backup_data.backup_item.id = create_new_backup::run(&mut client, &backup_data).await?; println!("backup id in main: {}", backup_data.backup_item.id); add_attachments::run(&mut client, &backup_data, None).await?; for log_index in 0..backup_data.log_items.len() { backup_data.log_items[log_index].id = send_log::run(&mut client, &backup_data, log_index).await?; add_attachments::run(&mut client, &backup_data, Some(log_index)).await?; } let result: BackupData = pull_backup::run(&mut client, &backup_data).await?; backup_utils::compare_backups(&backup_data, &result); // push so many attachments that the log item's data will have to be moved // from the db to the s3 let mut attachments_size = 0; let mut i = backup_data.log_items[0].attachments_holders.len(); let mut new_attachments: Vec = Vec::new(); while attachments_size < (attachments_fill_size as usize) { let att = format!("holder{}", i); attachments_size += att.len(); new_attachments.push(att); i += 1; } let mut old_attachments = backup_data.log_items[0].attachments_holders.clone(); backup_data.log_items[0].attachments_holders = new_attachments; add_attachments::run(&mut client, &backup_data, Some(0)).await?; backup_data.log_items[0] .attachments_holders .append(&mut old_attachments); let result = pull_backup::run(&mut client, &backup_data).await?; // check logs attachments // map let mut expected_log_map: HashMap = HashMap::new(); let mut result_log_map: HashMap = HashMap::new(); for i in 0..backup_data.log_items.len() { let expected: usize = backup_data.log_items[i].attachments_holders.len(); expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); let from_result: usize = result.log_items[i].attachments_holders.len(); result_log_map.insert(result.log_items[i].id.clone(), from_result); } for (expected_id, expected_size) in &expected_log_map { let result_size = result_log_map.get(expected_id).expect(&format!( "comparing logs attachments: expected id found in result: {}", expected_id )); assert_eq!( expected_size, result_size, "comparing logs attachments, sizes don't match, backup {}", backup_data.backup_item.id ); } Ok(()) } diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs index 430957928..abbb33723 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ b/services/commtest/tests/backup_performance_test.rs @@ -1,234 +1,231 @@ -mod backup; -mod tools; - -use backup::{ +use bytesize::ByteSize; +use commtest::backup::{ add_attachments, backup_utils::{self, BackupData, BackupServiceClient, Item}, create_new_backup, pull_backup, send_log, }; -use bytesize::ByteSize; +use commtest::tools::{obtain_number_of_threads, Error}; use std::env; use std::sync::mpsc::channel; use tokio::runtime::Runtime; -use tools::{obtain_number_of_threads, Error}; #[tokio::test] async fn backup_performance_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BACKUP") .expect("port env var expected but not received"); let client = BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for backup, number of threads: {}", number_of_threads ); let mut backup_data = vec![]; for i in 0..number_of_threads { backup_data.push(BackupData { user_id: format!("user{}", i), device_id: format!("device{}", i), backup_item: Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 3 + (i % 5)], (0..(i % 5)).map(|x| format!("holder{}", x)).collect(), ), log_items: (0..(i % 4)) .map(|x| { Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 2 + (x % 2)], (0..(i % 5)).map(|x| format!("holder{}-{}", i, x)).collect(), ) }) .collect(), }); } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // CREATE NEW BACKUP rt.block_on(async { println!("performing CREATE NEW BACKUP operations"); let mut handlers = vec![]; let (sender, receiver) = channel::<(usize, String)>(); for (i, item) in backup_data.iter().enumerate() { let item_cloned = item.clone(); let mut client_cloned = client.clone(); let sender_cloned = sender.clone(); handlers.push(tokio::spawn(async move { let id = create_new_backup::run(&mut client_cloned, &item_cloned) .await .unwrap(); assert!( !id.is_empty(), "backup id should not be empty after creating a new backup" ); sender_cloned.send((i, id)).unwrap(); })); } // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv // The channel is closed when all senders have been dropped, or when close // is called. The best option here is to clone the sender for every // thread, drop the original one and let all the clones be dropped when // going out of scope which is equal to the parent thread's termination. drop(sender); for handler in handlers { handler.await.unwrap(); } for data in receiver { println!("received: {:?}", data); let (index, id) = data; backup_data[index].backup_item.id = id; } }); // check if backup IDs are properly set for (i, item) in backup_data.iter().enumerate() { assert!( !item.backup_item.id.is_empty(), "missing backup id for index {}", i ); } // ADD ATTACHMENTS - BACKUPS rt.block_on(async { println!("performing ADD ATTACHMENTS - BACKUPS operations"); let mut handlers = vec![]; for item in &backup_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { if !item_cloned.backup_item.attachments_holders.is_empty() { add_attachments::run(&mut client_cloned, &item_cloned, None) .await .unwrap(); } })); } for handler in handlers { handler.await.unwrap(); } }); // SEND LOG rt.block_on(async { println!("performing SEND LOG operations"); let mut handlers = vec![]; let (sender, receiver) = channel::<(usize, usize, String)>(); for (backup_index, backup_item) in backup_data.iter().enumerate() { let backup_item_cloned = backup_item.clone(); for log_index in 0..backup_item_cloned.log_items.len() { let backup_item_recloned = backup_item_cloned.clone(); let mut client_cloned = client.clone(); let sender_cloned = sender.clone(); handlers.push(tokio::spawn(async move { println!( "sending log, backup index: [{}] log index: [{}]", backup_index, log_index ); let id = send_log::run( &mut client_cloned, &backup_item_recloned, log_index, ) .await .unwrap(); assert!(!id.is_empty(), "log id should not be empty after sending"); sender_cloned.send((backup_index, log_index, id)).unwrap(); })); } } // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv // The channel is closed when all senders have been dropped, or when close // is called. The best option here is to clone the sender for every // thread, drop the original one and let all the clones be dropped when // going out of scope which is equal to the parent thread's termination. drop(sender); for handler in handlers { handler.await.unwrap(); } for data in receiver { println!("received: {:?}", data); let (backup_index, log_index, id) = data; backup_data[backup_index].log_items[log_index].id = id; } }); // check if log IDs are properly set for (backup_index, backup_item) in backup_data.iter().enumerate() { for (log_index, log_item) in backup_item.log_items.iter().enumerate() { assert!( !log_item.id.is_empty(), "missing log id for backup index {} and log index {}", backup_index, log_index ); } } // ADD ATTACHMENTS - LOGS rt.block_on(async { println!("performing ADD ATTACHMENTS - LOGS operations"); let mut handlers = vec![]; for backup_item in &backup_data { let backup_item_cloned = backup_item.clone(); for log_index in 0..backup_item_cloned.log_items.len() { let backup_item_recloned = backup_item_cloned.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { if !backup_item_recloned .backup_item .attachments_holders .is_empty() { add_attachments::run( &mut client_cloned, &backup_item_recloned, Some(log_index), ) .await .unwrap(); } })); } } for handler in handlers { handler.await.unwrap(); } }); // PULL BACKUP rt.block_on(async { println!("performing PULL BACKUP operations"); let mut handlers = vec![]; for item in backup_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let result = pull_backup::run(&mut client_cloned, &item_cloned) .await .unwrap(); backup_utils::compare_backups(&item_cloned, &result); })); } for handler in handlers { handler.await.unwrap(); } }); }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs index 5aeb1ce67..68959470e 100644 --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,74 +1,71 @@ -mod blob; -mod tools; - -use blob::{ +use bytesize::ByteSize; +use commtest::blob::{ blob_utils::{BlobData, BlobServiceClient}, get, put, remove, }; -use bytesize::ByteSize; +use commtest::tools::{self, Error}; use std::env; -use tools::Error; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") .expect("port env var expected but not received"); let mut client = BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; let blob_data = vec![ BlobData { holder: "test_holder001".to_string(), hash: "test_hash001".to_string(), chunks_sizes: vec![ ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ], }, BlobData { holder: "test_holder002".to_string(), hash: "test_hash002".to_string(), chunks_sizes: vec![ *tools::GRPC_CHUNK_SIZE_LIMIT, *tools::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(10).as_u64() as usize, ], }, BlobData { holder: "test_holder003".to_string(), hash: "test_hash003".to_string(), chunks_sizes: vec![ *tools::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(100).as_u64() as usize, *tools::GRPC_CHUNK_SIZE_LIMIT, ], }, ]; for item in &blob_data { let data_exists: bool = put::run(&mut client, &item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { let received_sizes = get::run(&mut client, &blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); } for item in &blob_data { remove::run(&mut client, &item).await?; assert!( get::run(&mut client, &item).await.is_err(), "item should no longer be available" ); } Ok(()) } diff --git a/services/commtest/tests/blob_performance_test.rs b/services/commtest/tests/blob_performance_test.rs index e232fb5cb..a4757e3d5 100644 --- a/services/commtest/tests/blob_performance_test.rs +++ b/services/commtest/tests/blob_performance_test.rs @@ -1,116 +1,113 @@ -mod blob; -mod tools; - -use blob::{ +use bytesize::ByteSize; +use commtest::blob::{ blob_utils::{BlobData, BlobServiceClient}, get, put, remove, }; -use bytesize::ByteSize; +use commtest::tools::{obtain_number_of_threads, Error}; use std::env; use tokio::runtime::Runtime; -use tools::{obtain_number_of_threads, Error}; #[tokio::test] async fn blob_performance_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") .expect("port env var expected but not received"); let client = BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; let number_of_threads = obtain_number_of_threads(); println!( "Running performance tests for blob, number of threads: {}", number_of_threads ); let mut blob_data = vec![]; for i in 0..number_of_threads { let index: u64 = (i as u64) % 10; blob_data.push(BlobData { holder: format!("test_holder_{}", i), hash: format!("test_hash_{}", i), chunks_sizes: vec![ ByteSize::kib(200 + (300 - index * 20)).as_u64() as usize, ByteSize::kib(500 + (400 - index * 20)).as_u64() as usize, ByteSize::kib(700 + (500 - index * 25)).as_u64() as usize, ], }) } let rt = Runtime::new().unwrap(); tokio::task::spawn_blocking(move || { // PUT rt.block_on(async { println!("performing PUT operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let data_exists: bool = put::run(&mut client_cloned, &item_cloned).await.unwrap(); assert!(!data_exists, "test data should not exist"); })); } for handler in handlers { handler.await.unwrap(); } }); // GET rt.block_on(async { println!("performing GET operations"); let mut handlers = vec![]; for (i, item) in blob_data.iter().enumerate() { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { let received_sizes = get::run(&mut client_cloned, &item_cloned).await.unwrap(); let expected_data_size = item_cloned.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); })); } for handler in handlers { handler.await.unwrap(); } }); // REMOVE rt.block_on(async { println!("performing REMOVE operations"); let mut handlers = vec![]; for item in &blob_data { let item_cloned = item.clone(); let mut client_cloned = client.clone(); handlers.push(tokio::spawn(async move { remove::run(&mut client_cloned, &item_cloned).await.unwrap(); assert!( get::run(&mut client_cloned, &item_cloned).await.is_err(), "item should no longer be available" ); })); } for handler in handlers { handler.await.unwrap(); } }); }) .await .expect("Task panicked"); Ok(()) } diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_test.rs index ee7a750ba..647d3b161 100644 --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_test.rs @@ -1,6 +1,4 @@ -mod tools; - #[tokio::test] async fn tunnelbroker_integration_test() { assert!(false, "not implemented"); }