diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs index 5961f01c0..394cc0f19 100644 --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -1,300 +1,308 @@ mod proto { tonic::include_proto!("blob"); } use proto::blob_service_client::BlobServiceClient; use proto::put_request; use proto::put_request::Data::*; use proto::PutRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use crate::tools::{ c_char_pointer_to_string, check_error, report_error, string_to_c_char_pointer, }; use crate::RUNTIME; use lazy_static::lazy_static; use libc; use libc::c_char; use std::collections::HashMap; use std::ffi::CStr; use std::sync::Mutex; use tokio::sync::mpsc; use tokio::task::JoinHandle; #[derive(Debug)] struct PutRequestData { field_index: usize, data: Vec, } struct BidiClient { tx: mpsc::Sender, rx: mpsc::Receiver, rx_handle: JoinHandle<()>, } lazy_static! { // todo: we should consider limiting the clients size, // if every client is able to allocate up to 4MB data at a time static ref CLIENTS: Mutex> = Mutex::new(HashMap::new()); static ref ERROR_MESSAGES: Mutex> = Mutex::new(Vec::new()); } fn is_initialized(holder: &str) -> bool { match CLIENTS.lock() { Ok(clients) => clients.contains_key(holder), _ => { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); false } } } pub fn put_client_initialize_cxx( holder_char: *const c_char, ) -> Result<(), String> { let holder = c_char_pointer_to_string(holder_char)?; if is_initialized(&holder) { put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; } assert!( !is_initialized(&holder), "client cannot be initialized twice" ); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) { let (request_thread_tx, mut request_thread_rx) = mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let outbound = async_stream::stream! { while let Some(data) = request_thread_rx.recv().await { let request_data: Option = match data.field_index { 1 => { match String::from_utf8(data.data) { Ok(utf8_data) => Some(Holder(utf8_data)), _ => { report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } } 2 => { match String::from_utf8(data.data).ok() { Some(utf8_data) => Some(BlobHash(utf8_data)), None => { report_error(&ERROR_MESSAGES, "invalid utf-8", Some("put")); None }, } } 3 => { Some(DataChunk(data.data)) } _ => { report_error( &ERROR_MESSAGES, &format!("invalid field index value {}", data.field_index), Some("put") ); None } }; if let Some (unpacked_data) = request_data { let request = PutRequest { data: Some(unpacked_data), }; yield request; } else { report_error( &ERROR_MESSAGES, "an error occured, aborting connection", Some("put") ); break; } } }; // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::(MPSC_CHANNEL_BUFFER_CAPACITY); let rx_handle = RUNTIME.spawn(async move { match grpc_client.put(tonic::Request::new(outbound)).await { Ok(response) => { let mut inner_response = response.into_inner(); loop { match inner_response.message().await { Ok(maybe_response_message) => { let mut result = false; if let Some(response_message) = maybe_response_message { // warning: this will produce an error if there's more unread // responses than MPSC_CHANNEL_BUFFER_CAPACITY // you should then use put_client_blocking_read_cxx in order // to dequeue the responses in c++ and make room for more if let Ok(_) = response_thread_tx .try_send((response_message.data_exists as i32).to_string()) { result = true; } else { report_error( &ERROR_MESSAGES, "response queue full", Some("put"), ); } } if !result { break; } } Err(err) => { report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); break; } }; } } Err(err) => { report_error(&ERROR_MESSAGES, &err.to_string(), Some("put")); } }; }); if is_initialized(&holder) { return Err(format!( "client initialization overlapped for holder {}", holder )); } if let Ok(mut clients) = CLIENTS.lock() { let client = BidiClient { tx: request_thread_tx, rx: response_thread_rx, rx_handle, }; (*clients).insert(holder, client); return Ok(()); } return Err(format!("could not access client for holder {}", holder)); } Err("could not successfully connect to the blob server".to_string()) } pub fn put_client_blocking_read_cxx( holder_char: *const c_char, ) -> Result { let holder = c_char_pointer_to_string(holder_char)?; check_error(&ERROR_MESSAGES)?; let response: Option = RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { let maybe_client = clients.get_mut(&holder); if let Some(client) = maybe_client { if let Some(data) = client.rx.recv().await { return Some(data); } else { report_error( &ERROR_MESSAGES, "couldn't receive data via client's receiver", Some("put"), ); } } else { - report_error(&ERROR_MESSAGES, "no client detected in blocking read", Some("put")); + report_error( + &ERROR_MESSAGES, + "no client detected in blocking read", + Some("put"), + ); } } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } None }); check_error(&ERROR_MESSAGES)?; response.ok_or("response not received properly".to_string()) } /** * field index: * 1 - holder (utf8 string) * 2 - blob hash (utf8 string) * 3 - data chunk (bytes) */ pub fn put_client_write_cxx( holder_char: *const c_char, field_index: usize, data: *const c_char, ) -> Result<(), String> { let holder = c_char_pointer_to_string(holder_char)?; check_error(&ERROR_MESSAGES)?; let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); RUNTIME.block_on(async { if let Ok(clients) = CLIENTS.lock() { let maybe_client = clients.get(&holder); if let Some(client) = maybe_client { match client .tx .send(PutRequestData { field_index, data: data_bytes, }) .await { Ok(_) => (), Err(err) => report_error( &ERROR_MESSAGES, &format!("send data to receiver failed: {}", err), Some("put"), ), } } else { - report_error(&ERROR_MESSAGES, "no client detected in write", Some("put")); + report_error( + &ERROR_MESSAGES, + "no client detected in write", + Some("put"), + ); } } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } }); check_error(&ERROR_MESSAGES)?; Ok(()) } pub fn put_client_terminate_cxx( holder_char: *const c_char, ) -> Result<(), String> { let holder = c_char_pointer_to_string(holder_char)?; check_error(&ERROR_MESSAGES)?; if !is_initialized(&holder) { check_error(&ERROR_MESSAGES)?; return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { let maybe_client = clients.remove(&holder); if let Some(client) = maybe_client { drop(client.tx); RUNTIME.block_on(async { if client.rx_handle.await.is_err() { report_error( &ERROR_MESSAGES, "wait for receiver handle failed", Some("put"), ); } }); } else { return Err("no client detected in terminate".to_string()); } } else { return Err("couldn't access client".to_string()); } assert!( !is_initialized(&holder), "client transmitter handler released properly" ); check_error(&ERROR_MESSAGES)?; Ok(()) } diff --git a/services/commtest/tests/backup_integration_test.rs b/services/commtest/tests/backup_integration_test.rs index 231ef70cc..80d9decbc 100644 --- a/services/commtest/tests/backup_integration_test.rs +++ b/services/commtest/tests/backup_integration_test.rs @@ -1,125 +1,134 @@ #[path = "./backup/add_attachments.rs"] mod add_attachments; #[path = "./backup/backup_utils.rs"] mod backup_utils; #[path = "./backup/create_new_backup.rs"] mod create_new_backup; #[path = "./backup/pull_backup.rs"] mod pull_backup; #[path = "./backup/send_log.rs"] mod send_log; #[path = "./lib/tools.rs"] mod tools; use backup_utils::{BackupData, Item}; use bytesize::ByteSize; +use std::collections::HashMap; use tools::Error; use std::env; use backup_utils::BackupServiceClient; #[tokio::test] async fn backup_integration_test() -> Result<(), Error> { let port = env::var("COMM_SERVICES_PORT_BACKUP") .expect("port env var expected but not received"); let mut client = BackupServiceClient::connect(format!("http://localhost:{}", port)).await?; let attachments_fill_size: u64 = 500; let mut backup_data = BackupData { user_id: "user0000".to_string(), device_id: "device0000".to_string(), backup_item: Item::new( String::new(), vec![ByteSize::mib(1).as_u64() as usize; 6], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), log_items: vec![ // the item that almost hits the DB limit, we're going to later add a long // list of attachments, so that causes it to exceed the limit. // In this case its data should be moved to the S3 Item::new( String::new(), vec![ *tools::DYNAMO_DB_ITEM_SIZE_LIMIT - ByteSize::b(attachments_fill_size / 2).as_u64() as usize, ], vec!["holder0".to_string(), "holder1".to_string()], ), // just a small item Item::new( String::new(), vec![ByteSize::b(100).as_u64() as usize], vec!["holder0".to_string()], ), // a big item that should be placed in the S3 right away Item::new( String::new(), vec![*tools::GRPC_CHUNK_SIZE_LIMIT, *tools::GRPC_CHUNK_SIZE_LIMIT], vec![ "holder0".to_string(), "holder1".to_string(), "holder2".to_string(), ], ), ], }; backup_data.backup_item.id = create_new_backup::run(&mut client, &backup_data).await?; println!("backup id in main: {}", backup_data.backup_item.id); add_attachments::run(&mut client, &backup_data, None).await?; for log_index in 0..backup_data.log_items.len() { backup_data.log_items[log_index].id = send_log::run(&mut client, &backup_data, log_index).await?; add_attachments::run(&mut client, &backup_data, Some(log_index)).await?; } let result: BackupData = pull_backup::run(&mut client, &backup_data).await?; backup_utils::compare_backups(&backup_data, &result); // push so many attachments that the log item's data will have to be moved // from the db to the s3 let mut attachments_size = 0; let mut i = backup_data.log_items[0].attachments_holders.len(); let mut new_attachments: Vec = Vec::new(); while attachments_size < (attachments_fill_size as usize) { let att = format!("holder{}", i); attachments_size += att.len(); new_attachments.push(att); i += 1; } let mut old_attachments = backup_data.log_items[0].attachments_holders.clone(); backup_data.log_items[0].attachments_holders = new_attachments; add_attachments::run(&mut client, &backup_data, Some(0)).await?; backup_data.log_items[0] .attachments_holders .append(&mut old_attachments); let result = pull_backup::run(&mut client, &backup_data).await?; // check logs attachments + // map + let mut expected_log_map: HashMap = HashMap::new(); + let mut result_log_map: HashMap = HashMap::new(); for i in 0..backup_data.log_items.len() { let expected: usize = backup_data.log_items[i].attachments_holders.len(); + expected_log_map.insert(backup_data.log_items[i].id.clone(), expected); let from_result: usize = result.log_items[i].attachments_holders.len(); + result_log_map.insert(result.log_items[i].id.clone(), from_result); + } + for (expected_id, expected_size) in &expected_log_map { + let result_size = result_log_map.get(expected_id).expect(&format!( + "comparing logs attachments: expected id found in result: {}", + expected_id + )); assert_eq!( - from_result, expected, - "after attachment add: log {}: number of attachments holders do not match, - expected {}, got {}", - i, - expected, - from_result + expected_size, result_size, + "comparing logs attachments, sizes don't match, backup {}", + backup_data.backup_item.id ); } Ok(()) }