diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -17,7 +17,6 @@ use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::error; #[derive(Debug)] struct PutRequestData { @@ -59,17 +58,17 @@ } fn report_error(message: String) { - error!("[RUST] Error: {}", message); + println!("[RUST] Error: {}", message); if let Ok(mut error_messages) = ERROR_MESSAGES.lock() { error_messages.push(message); } - error!("could not access error messages") + panic!("could not access error messages") } fn check_error() -> Result<(), String> { if let Ok(error_messages) = ERROR_MESSAGES.lock() { if !error_messages.is_empty() { - return Err(error_messages.join("\n")); + return Err(error_messages.join("/")); } return Ok(()); } else { @@ -96,7 +95,7 @@ let outbound = async_stream::stream! { while let Some(data) = request_thread_rx.recv().await { println!("[RUST] [transmitter_thread] field index: {}", data.field_index); - println!("[RUST] [transmitter_thread] data: {:?}", data.data); + println!("[RUST] [transmitter_thread] data size: {}", data.data.len()); let request_data: Option = match data.field_index { 0 => { match String::from_utf8(data.data).ok() { @@ -152,9 +151,6 @@ None } }; - if maybe_response.is_none() { - return; - } match maybe_response { Some(response) => { let mut inner_response = response.into_inner(); @@ -188,6 +184,9 @@ }; } } + None => { + return; + } unexpected => { report_error(format!("unexpected result received: {:?}", unexpected)); } @@ -229,6 +228,7 @@ report_error("couldn't access client".to_string()); } }); + check_error()?; response.ok_or("response not received properly".to_string()) } @@ -241,7 +241,7 @@ let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); println!("[RUST] [put_client_process] field index: {}", field_index); - println!("[RUST] [put_client_process] data string: {:?}", data_bytes); + println!("[RUST] [put_client_process] data string size: {}", data_bytes.len()); RUNTIME.block_on(async { if let Ok(mut client) = CLIENT.lock() { @@ -256,6 +256,7 @@ } else { report_error("send data to receiver failed".to_string()); } + client.tx = Some(tx); } else { report_error("couldn't access client's transmitter".to_string()); } @@ -270,14 +271,14 @@ // returns vector of error messages // empty vector indicates that there were no errors pub fn put_client_terminate_cxx() -> Result<(), String> { - println!("[RUST] put_client_terminating"); + println!("[RUST] put_client_terminate_cxx begin"); check_error()?; if let Ok(mut client) = CLIENT.lock() { + if let Some(tx) = client.tx.take() { + drop(tx); + } if let Some(receiver_handle) = client.receiver_handle.take() { - if let Some(tx) = client.tx.take() { - drop(tx); - } RUNTIME.block_on(async { if receiver_handle.await.is_err() { report_error("wait for receiver handle failed".to_string()); @@ -292,6 +293,7 @@ !is_initialized(), "client transmitter handler released properly" ); + check_error()?; println!("[RUST] put_client_terminated"); Ok(()) } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -4,6 +4,10 @@ #include "GlobalTools.h" #include "Tools.h" +#include "blob_client/src/lib.rs.h" + +#include + namespace comm { namespace network { namespace reactor { @@ -64,12 +68,62 @@ } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - // todo:blob perform put:initialize + bool dataExists = false; + try { + put_client_initialize_cxx(); + put_client_write_cxx(0, this->holder.c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + put_client_write_cxx(1, this->dataHash.c_str()); + + rust::String responseStr = + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors + const char *value = responseStr.c_str(); + unsigned int intValue; + std::stringstream strValue; + + strValue << value; + strValue >> intValue; + + dataExists = (bool)intValue; + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } + if (dataExists) { + return std::make_unique( + grpc::Status::OK, true); + } return nullptr; } case State::DATA_CHUNKS: { - // todo:blob perform put:add chunk - // (std::move(*request.mutable_newcompactionchunk()) + if (request.mutable_newcompactionchunk()->empty()) { + return std::make_unique(grpc::Status::OK); + } + try { + put_client_write_cxx( + 2, + std::string(std::move(*request.mutable_newcompactionchunk())) + .c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } return nullptr; } } @@ -78,8 +132,13 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - // todo:blob perform put:add chunk ("") - // todo:blob perform put:wait for completion + try { + put_client_terminate_cxx(); + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } // TODO add recovery data // TODO handle attachments holders