diff --git a/services/commtest/src/backup/add_attachments.rs b/services/commtest/src/backup/add_attachments.rs deleted file mode 100644 --- a/services/commtest/src/backup/add_attachments.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::backup::backup_utils::{ - proto::AddAttachmentsRequest, BackupData, BackupServiceClient, -}; -use crate::constants::ATTACHMENT_DELIMITER; -use crate::tools::Error; -use tonic::Request; - -// log_index = None means that we add attachments to the backup -// log_index = Some(x) means that we add attachments to a specific log -pub async fn run( - client: &mut BackupServiceClient, - backup_data: &BackupData, - log_index: Option, -) -> Result<(), Error> { - let cloned_user_id = backup_data.user_id.clone(); - let cloned_backup_id = backup_data.backup_item.id.clone(); - let log_id: String = match log_index { - Some(index) => { - let log_id = backup_data.log_items[index].id.clone(); - println!("add attachments for log {}/{}", index, log_id); - log_id - } - None => { - println!("add attachments for backup"); - String::new() - } - }; - - let holders: String = match log_index { - Some(log_index) => backup_data.log_items[log_index] - .attachments_holders - .join(ATTACHMENT_DELIMITER), - None => backup_data - .backup_item - .attachments_holders - .join(ATTACHMENT_DELIMITER), - }; - - client - .add_attachments(Request::new(AddAttachmentsRequest { - user_id: cloned_user_id, - backup_id: cloned_backup_id, - log_id, - holders, - })) - .await?; - Ok(()) -} diff --git a/services/commtest/src/backup/backup_utils.rs b/services/commtest/src/backup/backup_utils.rs deleted file mode 100644 --- a/services/commtest/src/backup/backup_utils.rs +++ /dev/null @@ -1,101 +0,0 @@ -pub mod proto { - tonic::include_proto!("backup"); -} -pub use proto::backup_service_client::BackupServiceClient; -use std::collections::HashMap; - -// stands for both, backup and log items -#[derive(Clone)] -pub struct Item { - pub id: String, - pub chunks_sizes: Vec, - pub attachments_holders: Vec, -} - -impl Item { - pub fn new( - id: String, - chunks_sizes: Vec, - attachments_holders: Vec, - ) -> Item { - Item { - id, - chunks_sizes, - attachments_holders, - } - } -} - -#[derive(Clone)] -pub struct BackupData { - pub user_id: String, - pub device_id: String, - pub backup_item: Item, - pub log_items: Vec, -} - -pub fn compare_backups(backup_data: &BackupData, result: &BackupData) { - // check backup size - let expected: usize = backup_data.backup_item.chunks_sizes.iter().sum(); - let from_result: usize = result.backup_item.chunks_sizes.iter().sum(); - assert_eq!( - from_result, expected, - "backup sizes do not match, expected {}, got {}", - expected, from_result - ); - - // check backup attachments - let expected: usize = backup_data.backup_item.attachments_holders.len(); - let from_result: usize = result.backup_item.attachments_holders.len(); - assert_eq!( - from_result, expected, - "backup: number of attachments holders do not match, expected {}, got {}", - expected, from_result - ); - - // check number of logs - let expected: usize = backup_data.log_items.len(); - let from_result: usize = result.log_items.len(); - assert_eq!( - expected, from_result, - "backup id {} number of logs do not match, expected {}, got {}", - backup_data.backup_item.id, expected, from_result - ); - - // check log sizes - // map - let mut expected_log_map: HashMap = HashMap::new(); - let mut result_log_map: HashMap = HashMap::new(); - for i in 0..backup_data.log_items.len() { - let expected: usize = backup_data.log_items[i].chunks_sizes.iter().sum(); - let insert_result = - expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); - assert_eq!( - insert_result, None, - "expected collection contained duplicated log id: {}", - backup_data.log_items[i].id - ); - let from_result: usize = result.log_items[i].chunks_sizes.iter().sum(); - let insert_result = - result_log_map.insert(result.log_items[i].id.clone(), from_result); - assert_eq!( - insert_result, None, - "expected collection contained duplicated log id: {}", - result.log_items[i].id - ); - } - - for (expected_id, expected_size) in &expected_log_map { - let result_size = result_log_map.get(expected_id).expect(&format!( - "comparing logs: expected id found in result: {}", - expected_id - )); - assert_eq!( - expected_size, result_size, - "comparing logs, sizes don't match, backup {}", - backup_data.backup_item.id - ); - } - - // todo: check logs attachment holders -} diff --git a/services/commtest/src/backup/create_new_backup.rs b/services/commtest/src/backup/create_new_backup.rs deleted file mode 100644 --- a/services/commtest/src/backup/create_new_backup.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::backup::backup_utils::{ - proto::create_new_backup_request::Data::*, proto::CreateNewBackupRequest, - BackupData, BackupServiceClient, -}; -use crate::tools::{generate_stable_nbytes, DataHasher, Error}; -use tonic::Request; - -pub async fn run( - client: &mut BackupServiceClient, - backup_data: &BackupData, -) -> Result { - println!("create new backup"); - let cloned_user_id = backup_data.user_id.clone(); - let cloned_device_id = backup_data.device_id.clone(); - let cloned_backup_chunk_sizes = backup_data.backup_item.chunks_sizes.clone(); - let predefined_byte_value = None; - let outbound = async_stream::stream! { - println!(" - sending user id"); - let request = CreateNewBackupRequest { - data: Some(UserId(cloned_user_id)), - }; - yield request; - println!(" - sending device id"); - let request = CreateNewBackupRequest { - data: Some(DeviceId(cloned_device_id)), - }; - yield request; - println!(" - sending key entropy"); - let request = CreateNewBackupRequest { - data: Some(KeyEntropy(vec![65,66,67,68])), - }; - yield request; - println!(" - sending data hash"); - let mut hasher = DataHasher::new(); - for chunk_size in &cloned_backup_chunk_sizes { - DataHasher::update(&mut hasher, generate_stable_nbytes(*chunk_size, predefined_byte_value)); - } - - let request = CreateNewBackupRequest { - data: Some(NewCompactionHash(hasher.get_hash().as_bytes().to_vec())), - }; - yield request; - for chunk_size in &cloned_backup_chunk_sizes { - println!(" - sending data chunk {}", chunk_size); - let request = CreateNewBackupRequest { - data: Some(NewCompactionChunk(generate_stable_nbytes(*chunk_size, predefined_byte_value))), - }; - yield request; - } - }; - - let mut backup_id: String = String::new(); - let response = client.create_new_backup(Request::new(outbound)).await?; - let mut inbound = response.into_inner(); - while let Some(response) = inbound.message().await? { - if !response.backup_id.is_empty() { - assert!( - backup_id.is_empty(), - "backup id should be returned only once" - ); - backup_id = response.backup_id; - } - } - assert!( - !backup_id.is_empty(), - "could not get a backup id from the server" - ); - Ok(backup_id) -} diff --git a/services/commtest/src/backup/mod.rs b/services/commtest/src/backup/mod.rs deleted file mode 100644 --- a/services/commtest/src/backup/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod add_attachments; -pub mod backup_utils; -pub mod create_new_backup; -pub mod pull_backup; -pub mod send_log; diff --git a/services/commtest/src/backup/pull_backup.rs b/services/commtest/src/backup/pull_backup.rs deleted file mode 100644 --- a/services/commtest/src/backup/pull_backup.rs +++ /dev/null @@ -1,130 +0,0 @@ -use crate::backup::backup_utils::{ - proto::pull_backup_response::Data, proto::pull_backup_response::Data::*, - proto::pull_backup_response::Id, proto::pull_backup_response::Id::*, - proto::PullBackupRequest, BackupData, BackupServiceClient, Item, -}; -use crate::constants::ATTACHMENT_DELIMITER; -use crate::tools::Error; -use std::io::{Error as IOError, ErrorKind}; -use tonic::Request; - -#[derive(PartialEq, Debug)] -enum State { - Compaction, - Log, -} - -pub async fn run( - client: &mut BackupServiceClient, - backup_data: &BackupData, -) -> Result { - println!("pull backup"); - let cloned_user_id = backup_data.user_id.clone(); - let cloned_backup_id = backup_data.backup_item.id.clone(); - - let mut result = BackupData { - user_id: String::new(), - device_id: String::new(), - backup_item: Item::new(String::new(), Vec::new(), Vec::new()), - log_items: Vec::new(), - }; - - let response = client - .pull_backup(Request::new(PullBackupRequest { - user_id: cloned_user_id, - backup_id: cloned_backup_id, - })) - .await?; - let mut inbound = response.into_inner(); - let mut state: State = State::Compaction; - let mut current_id: String = String::new(); - while let Some(response) = inbound.message().await? { - let response_data: Option = response.data; - let id: Option = response.id; - let mut backup_id: Option = None; - let mut log_id: Option = None; - match id { - Some(BackupId(id)) => backup_id = Some(id), - Some(LogId(id)) => log_id = Some(id), - None => {} - }; - match response_data { - Some(CompactionChunk(chunk)) => { - assert_eq!( - state, - State::Compaction, - "invalid state, expected compaction, got {:?}", - state - ); - current_id = backup_id.ok_or(IOError::new( - ErrorKind::Other, - "backup id expected but not received", - ))?; - println!( - "compaction (id {}), pulling chunk (size: {})", - current_id, - chunk.len() - ); - result.backup_item.chunks_sizes.push(chunk.len()) - } - Some(LogChunk(chunk)) => { - if state == State::Compaction { - state = State::Log; - } - assert_eq!( - state, - State::Log, - "invalid state, expected log, got {:?}", - state - ); - let log_id = log_id.ok_or(IOError::new( - ErrorKind::Other, - "log id expected but not received", - ))?; - if log_id != current_id { - result.log_items.push(Item::new( - log_id.clone(), - Vec::new(), - Vec::new(), - )); - current_id = log_id; - } - let log_items_size = result.log_items.len() - 1; - result.log_items[log_items_size] - .chunks_sizes - .push(chunk.len()); - - println!("log (id {}) chunk size {}", current_id, chunk.len()); - } - None => {} - } - if let Some(holders) = response.attachment_holders { - let holders_split: Vec<&str> = - holders.split(ATTACHMENT_DELIMITER).collect(); - if state == State::Compaction { - for holder in holders_split { - if holder.len() == 0 { - continue; - } - println!("attachments for the backup: {}", holder); - result - .backup_item - .attachments_holders - .push(holder.to_string()); - } - } else if state == State::Log { - println!("attachments for the log {:?}: {}", current_id, holders); - for holder in holders_split { - if holder.len() == 0 { - continue; - } - let log_items_size = result.log_items.len() - 1; - result.log_items[log_items_size] - .attachments_holders - .push(holder.to_string()) - } - } - } - } - Ok(result) -} diff --git a/services/commtest/src/backup/send_log.rs b/services/commtest/src/backup/send_log.rs deleted file mode 100644 --- a/services/commtest/src/backup/send_log.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::backup::backup_utils::{ - proto::{send_log_request::Data::*, SendLogRequest}, - BackupData, BackupServiceClient, -}; -use crate::tools::{generate_stable_nbytes, DataHasher, Error}; -use tonic::Request; - -pub async fn run( - client: &mut BackupServiceClient, - backup_data: &BackupData, - log_index: usize, -) -> Result { - println!("send log"); - let cloned_user_id = backup_data.user_id.clone(); - let cloned_backup_id = backup_data.backup_item.id.clone(); - let cloned_log_sizes = backup_data.log_items[log_index].chunks_sizes.clone(); - let predefined_byte_value = None; - let outbound = async_stream::stream! { - println!(" - sending user id"); - let request = SendLogRequest { - data: Some(UserId(cloned_user_id)), - }; - yield request; - println!(" - sending backup id"); - let request = SendLogRequest { - data: Some(BackupId(cloned_backup_id)), - }; - yield request; - println!(" - sending log hash"); - let mut hasher = DataHasher::new(); - for chunk_size in &cloned_log_sizes { - DataHasher::update(&mut hasher, generate_stable_nbytes(*chunk_size, predefined_byte_value)); - } - - let request = SendLogRequest { - data: Some(LogHash(hasher.get_hash().as_bytes().to_vec())), - }; - yield request; - println!(" - sending log data"); - for log_size in &cloned_log_sizes { - println!(" - sending log data {}", *log_size); - let request = SendLogRequest { - data: Some(LogData(generate_stable_nbytes(*log_size, predefined_byte_value))), - }; - yield request; - } - }; - - let response = client.send_log(Request::new(outbound)).await?; - let inbound = response.into_inner(); - println!("send log response: {:?}", inbound.log_checkpoint); - Ok(inbound.log_checkpoint) -} diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs --- a/services/commtest/src/lib.rs +++ b/services/commtest/src/lib.rs @@ -1,4 +1,3 @@ -pub mod backup; pub mod blob; pub mod constants; pub mod identity; diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs deleted file mode 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ /dev/null @@ -1,126 +0,0 @@ -use bytesize::ByteSize; -use commtest::backup::{ - add_attachments, - backup_utils::{self, BackupData, BackupServiceClient, Item}, - create_new_backup, pull_backup, send_log, -}; -use commtest::constants; -use commtest::tools::Error; -use std::collections::HashMap; -use std::env; - -#[tokio::test] -async fn backup_integration_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received"); - let mut client = - BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; - - let attachments_fill_size: u64 = 500; - - let mut backup_data = BackupData { - user_id: "user0000".to_string(), - device_id: "device0000".to_string(), - backup_item: Item::new( - String::new(), - vec![ByteSize::mib(1).as_u64() as usize; 6], - vec![ - "holder0".to_string(), - "holder1".to_string(), - "holder2".to_string(), - ], - ), - log_items: vec![ - // the item that almost hits the DB limit, we're going to later add a long - // list of attachments, so that causes it to exceed the limit. - // In this case its data should be moved to the S3 - Item::new( - String::new(), - vec![ - *constants::DYNAMO_DB_ITEM_SIZE_LIMIT - - ByteSize::b(attachments_fill_size / 2).as_u64() as usize, - ], - vec!["holder0".to_string(), "holder1".to_string()], - ), - // just a small item - Item::new( - String::new(), - vec![ByteSize::b(100).as_u64() as usize], - vec!["holder0".to_string()], - ), - // a big item that should be placed in the S3 right away - Item::new( - String::new(), - vec![ - *constants::GRPC_CHUNK_SIZE_LIMIT, - *constants::GRPC_CHUNK_SIZE_LIMIT, - ], - vec![ - "holder0".to_string(), - "holder1".to_string(), - "holder2".to_string(), - ], - ), - ], - }; - - backup_data.backup_item.id = - create_new_backup::run(&mut client, &backup_data).await?; - println!("backup id in main: {}", backup_data.backup_item.id); - - add_attachments::run(&mut client, &backup_data, None).await?; - - for log_index in 0..backup_data.log_items.len() { - backup_data.log_items[log_index].id = - send_log::run(&mut client, &backup_data, log_index).await?; - add_attachments::run(&mut client, &backup_data, Some(log_index)).await?; - } - - let result: BackupData = pull_backup::run(&mut client, &backup_data).await?; - - backup_utils::compare_backups(&backup_data, &result); - - // push so many attachments that the log item's data will have to be moved - // from the db to the s3 - let mut attachments_size = 0; - let mut i = backup_data.log_items[0].attachments_holders.len(); - let mut new_attachments: Vec = Vec::new(); - while attachments_size < (attachments_fill_size as usize) { - let att = format!("holder{}", i); - attachments_size += att.len(); - new_attachments.push(att); - i += 1; - } - - let mut old_attachments = - backup_data.log_items[0].attachments_holders.clone(); - backup_data.log_items[0].attachments_holders = new_attachments; - add_attachments::run(&mut client, &backup_data, Some(0)).await?; - backup_data.log_items[0] - .attachments_holders - .append(&mut old_attachments); - let result = pull_backup::run(&mut client, &backup_data).await?; - // check logs attachments - // map - let mut expected_log_map: HashMap = HashMap::new(); - let mut result_log_map: HashMap = HashMap::new(); - for i in 0..backup_data.log_items.len() { - let expected: usize = backup_data.log_items[i].attachments_holders.len(); - expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); - let from_result: usize = result.log_items[i].attachments_holders.len(); - result_log_map.insert(result.log_items[i].id.clone(), from_result); - } - for (expected_id, expected_size) in &expected_log_map { - let result_size = result_log_map.get(expected_id).expect(&format!( - "comparing logs attachments: expected id found in result: {}", - expected_id - )); - assert_eq!( - expected_size, result_size, - "comparing logs attachments, sizes don't match, backup {}", - backup_data.backup_item.id - ); - } - - Ok(()) -} diff --git a/services/commtest/tests/backup_performance_test.rs b/services/commtest/tests/backup_performance_test.rs deleted file mode 100644 --- a/services/commtest/tests/backup_performance_test.rs +++ /dev/null @@ -1,231 +0,0 @@ -use bytesize::ByteSize; -use commtest::backup::{ - add_attachments, - backup_utils::{self, BackupData, BackupServiceClient, Item}, - create_new_backup, pull_backup, send_log, -}; -use commtest::tools::{obtain_number_of_threads, Error}; -use std::env; -use std::sync::mpsc::channel; -use tokio::runtime::Runtime; - -#[tokio::test] -async fn backup_performance_test() -> Result<(), Error> { - let port = env::var("COMM_SERVICES_PORT_BACKUP") - .expect("port env var expected but not received"); - let client = - BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; - - let number_of_threads = obtain_number_of_threads(); - - println!( - "Running performance tests for backup, number of threads: {}", - number_of_threads - ); - - let mut backup_data = vec![]; - - for i in 0..number_of_threads { - backup_data.push(BackupData { - user_id: format!("user{}", i), - device_id: format!("device{}", i), - backup_item: Item::new( - String::new(), - vec![ByteSize::mib(1).as_u64() as usize; 3 + (i % 5)], - (0..(i % 5)).map(|x| format!("holder{}", x)).collect(), - ), - log_items: (0..(i % 4)) - .map(|x| { - Item::new( - String::new(), - vec![ByteSize::mib(1).as_u64() as usize; 2 + (x % 2)], - (0..(i % 5)).map(|x| format!("holder{}-{}", i, x)).collect(), - ) - }) - .collect(), - }); - } - - let rt = Runtime::new().unwrap(); - tokio::task::spawn_blocking(move || { - // CREATE NEW BACKUP - rt.block_on(async { - println!("performing CREATE NEW BACKUP operations"); - let mut handlers = vec![]; - let (sender, receiver) = channel::<(usize, String)>(); - for (i, item) in backup_data.iter().enumerate() { - let item_cloned = item.clone(); - let mut client_cloned = client.clone(); - let sender_cloned = sender.clone(); - handlers.push(tokio::spawn(async move { - let id = create_new_backup::run(&mut client_cloned, &item_cloned) - .await - .unwrap(); - assert!( - !id.is_empty(), - "backup id should not be empty after creating a new backup" - ); - sender_cloned.send((i, id)).unwrap(); - })); - } - // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv - // The channel is closed when all senders have been dropped, or when close - // is called. The best option here is to clone the sender for every - // thread, drop the original one and let all the clones be dropped when - // going out of scope which is equal to the parent thread's termination. - drop(sender); - - for handler in handlers { - handler.await.unwrap(); - } - for data in receiver { - println!("received: {:?}", data); - let (index, id) = data; - backup_data[index].backup_item.id = id; - } - }); - - // check if backup IDs are properly set - for (i, item) in backup_data.iter().enumerate() { - assert!( - !item.backup_item.id.is_empty(), - "missing backup id for index {}", - i - ); - } - - // ADD ATTACHMENTS - BACKUPS - rt.block_on(async { - println!("performing ADD ATTACHMENTS - BACKUPS operations"); - let mut handlers = vec![]; - for item in &backup_data { - let item_cloned = item.clone(); - let mut client_cloned = client.clone(); - handlers.push(tokio::spawn(async move { - if !item_cloned.backup_item.attachments_holders.is_empty() { - add_attachments::run(&mut client_cloned, &item_cloned, None) - .await - .unwrap(); - } - })); - } - - for handler in handlers { - handler.await.unwrap(); - } - }); - - // SEND LOG - rt.block_on(async { - println!("performing SEND LOG operations"); - let mut handlers = vec![]; - let (sender, receiver) = channel::<(usize, usize, String)>(); - for (backup_index, backup_item) in backup_data.iter().enumerate() { - let backup_item_cloned = backup_item.clone(); - for log_index in 0..backup_item_cloned.log_items.len() { - let backup_item_recloned = backup_item_cloned.clone(); - let mut client_cloned = client.clone(); - let sender_cloned = sender.clone(); - handlers.push(tokio::spawn(async move { - println!( - "sending log, backup index: [{}] log index: [{}]", - backup_index, log_index - ); - let id = send_log::run( - &mut client_cloned, - &backup_item_recloned, - log_index, - ) - .await - .unwrap(); - assert!(!id.is_empty(), "log id should not be empty after sending"); - sender_cloned.send((backup_index, log_index, id)).unwrap(); - })); - } - } - // https://docs.rs/tokio/1.1.0/tokio/sync/mpsc/struct.Receiver.html#method.recv - // The channel is closed when all senders have been dropped, or when close - // is called. The best option here is to clone the sender for every - // thread, drop the original one and let all the clones be dropped when - // going out of scope which is equal to the parent thread's termination. - drop(sender); - - for handler in handlers { - handler.await.unwrap(); - } - for data in receiver { - println!("received: {:?}", data); - let (backup_index, log_index, id) = data; - backup_data[backup_index].log_items[log_index].id = id; - } - }); - - // check if log IDs are properly set - for (backup_index, backup_item) in backup_data.iter().enumerate() { - for (log_index, log_item) in backup_item.log_items.iter().enumerate() { - assert!( - !log_item.id.is_empty(), - "missing log id for backup index {} and log index {}", - backup_index, - log_index - ); - } - } - - // ADD ATTACHMENTS - LOGS - rt.block_on(async { - println!("performing ADD ATTACHMENTS - LOGS operations"); - let mut handlers = vec![]; - for backup_item in &backup_data { - let backup_item_cloned = backup_item.clone(); - for log_index in 0..backup_item_cloned.log_items.len() { - let backup_item_recloned = backup_item_cloned.clone(); - let mut client_cloned = client.clone(); - handlers.push(tokio::spawn(async move { - if !backup_item_recloned - .backup_item - .attachments_holders - .is_empty() - { - add_attachments::run( - &mut client_cloned, - &backup_item_recloned, - Some(log_index), - ) - .await - .unwrap(); - } - })); - } - } - - for handler in handlers { - handler.await.unwrap(); - } - }); - - // PULL BACKUP - rt.block_on(async { - println!("performing PULL BACKUP operations"); - let mut handlers = vec![]; - for item in backup_data { - let item_cloned = item.clone(); - let mut client_cloned = client.clone(); - handlers.push(tokio::spawn(async move { - let result = pull_backup::run(&mut client_cloned, &item_cloned) - .await - .unwrap(); - backup_utils::compare_backups(&item_cloned, &result); - })); - } - - for handler in handlers { - handler.await.unwrap(); - } - }); - }) - .await - .expect("Task panicked"); - - Ok(()) -}