diff --git a/services/commtest/src/backup/add_attachments.rs b/services/commtest/src/backup/add_attachments.rs index df14db8d3..949fab828 100644 --- a/services/commtest/src/backup/add_attachments.rs +++ b/services/commtest/src/backup/add_attachments.rs @@ -1,47 +1,48 @@ use crate::backup::backup_utils::{ proto::AddAttachmentsRequest, BackupData, BackupServiceClient, }; -use crate::tools::{Error, ATTACHMENT_DELIMITER}; +use crate::constants::ATTACHMENT_DELIMITER; +use crate::tools::Error; use tonic::Request; // log_index = None means that we add attachments to the backup // log_index = Some(x) means that we add attachments to a specific log pub async fn run( client: &mut BackupServiceClient, backup_data: &BackupData, log_index: Option, ) -> Result<(), Error> { let cloned_user_id = backup_data.user_id.clone(); let cloned_backup_id = backup_data.backup_item.id.clone(); let log_id: String = match log_index { Some(index) => { let log_id = backup_data.log_items[index].id.clone(); println!("add attachments for log {}/{}", index, log_id); log_id } None => { println!("add attachments for backup"); String::new() } }; let holders: String = match log_index { Some(log_index) => backup_data.log_items[log_index] .attachments_holders .join(ATTACHMENT_DELIMITER), None => backup_data .backup_item .attachments_holders .join(ATTACHMENT_DELIMITER), }; client .add_attachments(Request::new(AddAttachmentsRequest { user_id: cloned_user_id, backup_id: cloned_backup_id, log_id, holders, })) .await?; Ok(()) } diff --git a/services/commtest/src/backup/pull_backup.rs b/services/commtest/src/backup/pull_backup.rs index 1e8866cf4..7418a5a56 100644 --- a/services/commtest/src/backup/pull_backup.rs +++ b/services/commtest/src/backup/pull_backup.rs @@ -1,129 +1,130 @@ use crate::backup::backup_utils::{ proto::pull_backup_response::Data, proto::pull_backup_response::Data::*, proto::pull_backup_response::Id, proto::pull_backup_response::Id::*, proto::PullBackupRequest, BackupData, BackupServiceClient, Item, }; -use crate::tools::{Error, ATTACHMENT_DELIMITER}; +use crate::constants::ATTACHMENT_DELIMITER; +use crate::tools::Error; use std::io::{Error as IOError, ErrorKind}; use tonic::Request; #[derive(PartialEq, Debug)] enum State { Compaction, Log, } pub async fn run( client: &mut BackupServiceClient, backup_data: &BackupData, ) -> Result { println!("pull backup"); let cloned_user_id = backup_data.user_id.clone(); let cloned_backup_id = backup_data.backup_item.id.clone(); let mut result = BackupData { user_id: String::new(), device_id: String::new(), backup_item: Item::new(String::new(), Vec::new(), Vec::new()), log_items: Vec::new(), }; let response = client .pull_backup(Request::new(PullBackupRequest { user_id: cloned_user_id, backup_id: cloned_backup_id, })) .await?; let mut inbound = response.into_inner(); let mut state: State = State::Compaction; let mut current_id: String = String::new(); while let Some(response) = inbound.message().await? { let response_data: Option = response.data; let id: Option = response.id; let mut backup_id: Option = None; let mut log_id: Option = None; match id { Some(BackupId(id)) => backup_id = Some(id), Some(LogId(id)) => log_id = Some(id), None => {} }; match response_data { Some(CompactionChunk(chunk)) => { assert_eq!( state, State::Compaction, "invalid state, expected compaction, got {:?}", state ); current_id = backup_id.ok_or(IOError::new( ErrorKind::Other, "backup id expected but not received", ))?; println!( "compaction (id {}), pulling chunk (size: {})", current_id, chunk.len() ); result.backup_item.chunks_sizes.push(chunk.len()) } Some(LogChunk(chunk)) => { if state == State::Compaction { state = State::Log; } assert_eq!( state, State::Log, "invalid state, expected log, got {:?}", state ); let log_id = log_id.ok_or(IOError::new( ErrorKind::Other, "log id expected but not received", ))?; if log_id != current_id { result.log_items.push(Item::new( log_id.clone(), Vec::new(), Vec::new(), )); current_id = log_id; } let log_items_size = result.log_items.len() - 1; result.log_items[log_items_size] .chunks_sizes .push(chunk.len()); println!("log (id {}) chunk size {}", current_id, chunk.len()); } None => {} } if let Some(holders) = response.attachment_holders { let holders_split: Vec<&str> = holders.split(ATTACHMENT_DELIMITER).collect(); if state == State::Compaction { for holder in holders_split { if holder.len() == 0 { continue; } println!("attachments for the backup: {}", holder); result .backup_item .attachments_holders .push(holder.to_string()); } } else if state == State::Log { println!("attachments for the log {:?}: {}", current_id, holders); for holder in holders_split { if holder.len() == 0 { continue; } let log_items_size = result.log_items.len() - 1; result.log_items[log_items_size] .attachments_holders .push(holder.to_string()) } } } } Ok(result) } diff --git a/services/commtest/src/constants.rs b/services/commtest/src/constants.rs new file mode 100644 index 000000000..59e86fc92 --- /dev/null +++ b/services/commtest/src/constants.rs @@ -0,0 +1,13 @@ +use bytesize::ByteSize; +use lazy_static::lazy_static; + +pub const ATTACHMENT_DELIMITER: &str = ";"; + +pub const GRPC_METADATA_SIZE_BYTES: usize = 5; + +lazy_static! { + pub static ref DYNAMO_DB_ITEM_SIZE_LIMIT: usize = + ByteSize::kib(400).as_u64() as usize; + pub static ref GRPC_CHUNK_SIZE_LIMIT: usize = + (ByteSize::mib(4).as_u64() as usize) - GRPC_METADATA_SIZE_BYTES; +} diff --git a/services/commtest/src/lib.rs b/services/commtest/src/lib.rs index d58cc0f78..45d03b1f1 100644 --- a/services/commtest/src/lib.rs +++ b/services/commtest/src/lib.rs @@ -1,3 +1,4 @@ pub mod backup; pub mod blob; +pub mod constants; pub mod tools; diff --git a/services/commtest/src/tools.rs b/services/commtest/src/tools.rs index fe0d76490..cf655adc7 100644 --- a/services/commtest/src/tools.rs +++ b/services/commtest/src/tools.rs @@ -1,66 +1,53 @@ -use bytesize::ByteSize; use hex::ToHex; -use lazy_static::lazy_static; use num_cpus; use sha2::{Digest, Sha512}; use std::env; pub fn generate_stable_nbytes( number_of_bytes: usize, predefined_byte_value: Option, ) -> Vec { let byte_value = predefined_byte_value.unwrap_or(b'A'); return vec![byte_value; number_of_bytes]; } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] Proto(std::io::Error), #[display(...)] Tonic(tonic::transport::Error), #[display(...)] TonicStatus(tonic::Status), } -pub const GRPC_METADATA_SIZE_BYTES: usize = 5; - -lazy_static! { - pub static ref DYNAMO_DB_ITEM_SIZE_LIMIT: usize = - ByteSize::kib(400).as_u64() as usize; - pub static ref GRPC_CHUNK_SIZE_LIMIT: usize = - (ByteSize::mib(4).as_u64() as usize) - GRPC_METADATA_SIZE_BYTES; -} - -pub const ATTACHMENT_DELIMITER: &str = ";"; - pub fn obtain_number_of_threads() -> usize { let number_of_threads_str: String = env::var("COMM_NUMBER_OF_THREADS").unwrap(); if number_of_threads_str.is_empty() { return num_cpus::get(); } return number_of_threads_str.parse::().unwrap(); } pub struct DataHasher { hasher: Sha512, } impl DataHasher { pub fn new() -> DataHasher { return DataHasher { hasher: Sha512::new(), }; } pub fn update(data_hasher: &mut DataHasher, bytes: Vec) { data_hasher.hasher.update(bytes); } pub fn get_hash(self) -> String { let hash = self.hasher.finalize(); return hash.encode_hex::(); } } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 6a0ef7efb..7a56233e2 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,122 +1,126 @@ use bytesize::ByteSize; use commtest::backup::{ add_attachments, backup_utils::{self, BackupData, BackupServiceClient, Item}, create_new_backup, pull_backup, send_log, }; -use commtest::tools::{self, Error}; +use commtest::constants; +use commtest::tools::Error; use std::collections::HashMap; use std::env; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BACKUP") .expect("port env var expected but not received"); let mut client = BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; let attachments_fill_size: u64 = 500; let mut backup_data = BackupData { user_id: "user0000".to_string(), device_id: "device0000".to_string(), backup_item: Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 6], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), log_items: vec![ // the item that almost hits the DB limit, we're going to later add a long // list of attachments, so that causes it to exceed the limit. // In this case its data should be moved to the S3 Item::new( String::new(), vec![ - *tools::DYNAMO_DB_ITEM_SIZE_LIMIT + *constants::DYNAMO_DB_ITEM_SIZE_LIMIT - ByteSize::b(attachments_fill_size / 2).as_u64() as usize, ], vec!["holder0".to_string(), "holder1".to_string()], ), // just a small item Item::new( String::new(), vec![ByteSize::b(100).as_u64() as usize], vec!["holder0".to_string()], ), // a big item that should be placed in the S3 right away Item::new( String::new(), - vec![*tools::GRPC_CHUNK_SIZE_LIMIT, *tools::GRPC_CHUNK_SIZE_LIMIT], + vec![ + *constants::GRPC_CHUNK_SIZE_LIMIT, + *constants::GRPC_CHUNK_SIZE_LIMIT, + ], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), ], }; backup_data.backup_item.id = create_new_backup::run(&mut client, &backup_data).await?; println!("backup id in main: {}", backup_data.backup_item.id); add_attachments::run(&mut client, &backup_data, None).await?; for log_index in 0..backup_data.log_items.len() { backup_data.log_items[log_index].id = send_log::run(&mut client, &backup_data, log_index).await?; add_attachments::run(&mut client, &backup_data, Some(log_index)).await?; } let result: BackupData = pull_backup::run(&mut client, &backup_data).await?; backup_utils::compare_backups(&backup_data, &result); // push so many attachments that the log item's data will have to be moved // from the db to the s3 let mut attachments_size = 0; let mut i = backup_data.log_items[0].attachments_holders.len(); let mut new_attachments: Vec = Vec::new(); while attachments_size < (attachments_fill_size as usize) { let att = format!("holder{}", i); attachments_size += att.len(); new_attachments.push(att); i += 1; } let mut old_attachments = backup_data.log_items[0].attachments_holders.clone(); backup_data.log_items[0].attachments_holders = new_attachments; add_attachments::run(&mut client, &backup_data, Some(0)).await?; backup_data.log_items[0] .attachments_holders .append(&mut old_attachments); let result = pull_backup::run(&mut client, &backup_data).await?; // check logs attachments // map let mut expected_log_map: HashMap = HashMap::new(); let mut result_log_map: HashMap = HashMap::new(); for i in 0..backup_data.log_items.len() { let expected: usize = backup_data.log_items[i].attachments_holders.len(); expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); let from_result: usize = result.log_items[i].attachments_holders.len(); result_log_map.insert(result.log_items[i].id.clone(), from_result); } for (expected_id, expected_size) in &expected_log_map { let result_size = result_log_map.get(expected_id).expect(&format!( "comparing logs attachments: expected id found in result: {}", expected_id )); assert_eq!( expected_size, result_size, "comparing logs attachments, sizes don't match, backup {}", backup_data.backup_item.id ); } Ok(()) } diff --git a/services/commtest/tests/blob_integration_test.rs b/services/commtest/tests/blob_integration_test.rs index 68959470e..d1604cdfb 100644 --- a/services/commtest/tests/blob_integration_test.rs +++ b/services/commtest/tests/blob_integration_test.rs @@ -1,71 +1,72 @@ use bytesize::ByteSize; use commtest::blob::{ blob_utils::{BlobData, BlobServiceClient}, get, put, remove, }; -use commtest::tools::{self, Error}; +use commtest::constants; +use commtest::tools::Error; use std::env; #[tokio::test] async fn blob_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BLOB") .expect("port env var expected but not received"); let mut client = BlobServiceClient::connect(format!("http://localhost:{}", port)).await?; let blob_data = vec![ BlobData { holder: "test_holder001".to_string(), hash: "test_hash001".to_string(), chunks_sizes: vec![ ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ByteSize::b(100).as_u64() as usize, ], }, BlobData { holder: "test_holder002".to_string(), hash: "test_hash002".to_string(), chunks_sizes: vec![ - *tools::GRPC_CHUNK_SIZE_LIMIT, - *tools::GRPC_CHUNK_SIZE_LIMIT, + *constants::GRPC_CHUNK_SIZE_LIMIT, + *constants::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(10).as_u64() as usize, ], }, BlobData { holder: "test_holder003".to_string(), hash: "test_hash003".to_string(), chunks_sizes: vec![ - *tools::GRPC_CHUNK_SIZE_LIMIT, + *constants::GRPC_CHUNK_SIZE_LIMIT, ByteSize::b(100).as_u64() as usize, - *tools::GRPC_CHUNK_SIZE_LIMIT, + *constants::GRPC_CHUNK_SIZE_LIMIT, ], }, ]; for item in &blob_data { let data_exists: bool = put::run(&mut client, &item).await?; assert!(!data_exists, "test data should not exist"); } for (i, blob_item) in blob_data.iter().enumerate() { let received_sizes = get::run(&mut client, &blob_item).await?; let expected_data_size = blob_item.chunks_sizes.iter().sum::(); let received_data_size = received_sizes.iter().sum::(); assert_eq!( expected_data_size, received_data_size, "invalid size of data for index {}, expected {}, got {}", i, expected_data_size, received_data_size ); } for item in &blob_data { remove::run(&mut client, &item).await?; assert!( get::run(&mut client, &item).await.is_err(), "item should no longer be available" ); } Ok(()) }