diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -6,7 +6,7 @@ use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{report_error, check_error}; +use crate::tools::{check_error, report_error}; use lazy_static::lazy_static; use libc; use libc::c_char; diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -16,13 +16,20 @@ mod ffi { extern "Rust" { // put - fn put_client_initialize_cxx() -> Result<()>; + unsafe fn put_client_initialize_cxx( + holder_char: *const c_char, + ) -> Result<()>; unsafe fn put_client_write_cxx( + holder_char: *const c_char, field_index: usize, data: *const c_char, ) -> Result<()>; - fn put_client_blocking_read_cxx() -> Result; - fn put_client_terminate_cxx() -> Result<()>; + unsafe fn put_client_blocking_read_cxx( + holder_char: *const c_char, + ) -> Result; + unsafe fn put_client_terminate_cxx( + holder_char: *const c_char, + ) -> Result<()>; // get unsafe fn get_client_initialize_cxx( holder_char: *const c_char, diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -9,10 +9,13 @@ use proto::PutResponse; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{report_error, check_error}; +use crate::tools::{ + c_char_pointer_to_string, check_error, report_error, string_to_c_char_pointer, +}; use lazy_static::lazy_static; use libc; use libc::c_char; +use std::collections::HashMap; use std::ffi::CStr; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; @@ -33,30 +36,35 @@ } lazy_static! { - static ref CLIENT: Arc>> = - Arc::new(Mutex::new(None)); - // todo we should probably create separate clients for different IDs + // todo: we should consider limiting the clients size, + // if every client is able to allocate up to 4MB data at a time + static ref CLIENTS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); static ref RUNTIME: Runtime = Runtime::new().unwrap(); static ref ERROR_MESSAGES: Arc>> = Arc::new(Mutex::new(Vec::new())); } -fn is_initialized() -> bool { - if let Ok(client) = CLIENT.lock() { - if client.is_some() { - return true; - } +fn is_initialized(holder: &String) -> bool { + if let Ok(clients) = CLIENTS.lock() { + return clients.contains_key(holder); } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } false } -pub fn put_client_initialize_cxx() -> Result<(), String> { - if is_initialized() { - put_client_terminate_cxx()?; +pub fn put_client_initialize_cxx( + holder_char: *const c_char, +) -> Result<(), String> { + let holder = c_char_pointer_to_string(holder_char); + if is_initialized(&holder) { + put_client_terminate_cxx(string_to_c_char_pointer(&holder))?; } - assert!(!is_initialized(), "client cannot be initialized twice"); + assert!( + !is_initialized(&holder), + "client cannot be initialized twice" + ); // grpc if let Ok(mut grpc_client) = RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) @@ -91,7 +99,13 @@ Some(DataChunk(data.data)) } _ => { - report_error(&ERROR_MESSAGES, &format!("invalid field index value {}", data.field_index), Some("put")); + report_error( + &ERROR_MESSAGES, + &format!( + "invalid field index value {}", + data.field_index), + Some("put") + ); None } }; @@ -101,7 +115,10 @@ }; yield request; } else { - report_error(&ERROR_MESSAGES, "an error occured, aborting connection", Some("put")); + report_error( + &ERROR_MESSAGES, + "an error occured, aborting connection", Some("put") + ); break; } } @@ -131,10 +148,10 @@ Ok(maybe_response_message) => { let mut result = false; if let Some(response_message) = maybe_response_message { - // warning: this will produce an error if there's more unread responses than - // MPSC_CHANNEL_BUFFER_CAPACITY - // you should then use put_client_blocking_read_cxx in order to dequeue - // the responses in c++ and make room for more + // warning: this will produce an error if there's more unread + // responses than MPSC_CHANNEL_BUFFER_CAPACITY + // you should then use put_client_blocking_read_cxx in order + // to dequeue the responses in c++ and make room for more if let Ok(_) = response_thread_tx .try_send((response_message.data_exists as i32).to_string()) { @@ -167,25 +184,36 @@ }; }); - if let Ok(mut client) = CLIENT.lock() { - *client = Some(BidiClient { + if is_initialized(&holder) { + return Err(format!( + "client initialization overlapped for holder {}", + holder + )); + } + if let Ok(mut clients) = CLIENTS.lock() { + let client = BidiClient { tx: request_thread_tx, rx: response_thread_rx, rx_handle, - }); + }; + (*clients).insert(holder, client); return Ok(()); } - return Err("could not access client".to_string()); + return Err(format!("could not access client for holder {}", holder)); } Err("could not successfully connect to the blob server".to_string()) } -pub fn put_client_blocking_read_cxx() -> Result { +pub fn put_client_blocking_read_cxx( + holder_char: *const c_char, +) -> Result { + let holder = c_char_pointer_to_string(holder_char); let mut response: Option = None; check_error(&ERROR_MESSAGES)?; RUNTIME.block_on(async { - if let Ok(mut maybe_client) = CLIENT.lock() { - if let Some(mut client) = (*maybe_client).take() { + if let Ok(mut clients) = CLIENTS.lock() { + let maybe_client = clients.get_mut(&holder); + if let Some(client) = maybe_client { if let Some(data) = client.rx.recv().await { response = Some(data); } else { @@ -195,9 +223,8 @@ Some("put"), ); } - *maybe_client = Some(client); } else { - report_error(&ERROR_MESSAGES, "no client detected", Some("put")); + report_error(&ERROR_MESSAGES, "no client detected 1", Some("put")); } } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); @@ -214,16 +241,19 @@ * 2 - data chunk (bytes) */ pub fn put_client_write_cxx( + holder_char: *const c_char, field_index: usize, data: *const c_char, ) -> Result<(), String> { + let holder = c_char_pointer_to_string(holder_char); check_error(&ERROR_MESSAGES)?; let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); RUNTIME.block_on(async { - if let Ok(mut maybe_client) = CLIENT.lock() { - if let Some(client) = (*maybe_client).take() { + if let Ok(clients) = CLIENTS.lock() { + let maybe_client = clients.get(&holder); + if let Some(client) = maybe_client { match client .tx .send(PutRequestData { @@ -239,9 +269,8 @@ Some("put"), ), } - *maybe_client = Some(client); } else { - report_error(&ERROR_MESSAGES, "no client detected", Some("put")); + report_error(&ERROR_MESSAGES, "no client detected 2", Some("put")); } } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); @@ -250,14 +279,18 @@ Ok(()) } -pub fn put_client_terminate_cxx() -> Result<(), String> { +pub fn put_client_terminate_cxx( + holder_char: *const c_char, +) -> Result<(), String> { + let holder = c_char_pointer_to_string(holder_char); check_error(&ERROR_MESSAGES)?; - if !is_initialized() { + if !is_initialized(&holder) { return Ok(()); } - if let Ok(mut maybe_client) = CLIENT.lock() { - if let Some(client) = (*maybe_client).take() { + if let Ok(mut clients) = CLIENTS.lock() { + let maybe_client = clients.remove(&holder); + if let Some(client) = maybe_client { drop(client.tx); RUNTIME.block_on(async { if client.rx_handle.await.is_err() { @@ -269,14 +302,14 @@ } }); } else { - return Err("no client detected".to_string()); + return Err("no client detected 3".to_string()); } } else { report_error(&ERROR_MESSAGES, "couldn't access client", Some("put")); } assert!( - !is_initialized(), + !is_initialized(&holder), "client transmitter handler released properly" ); check_error(&ERROR_MESSAGES)?;