Page MenuHomePhabricator

D4975.diff
No OneTemporary

D4975.diff

diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -17,7 +17,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
#[derive(Debug)]
struct PutRequestData {
@@ -59,17 +58,17 @@
}
fn report_error(message: String) {
- error!("[RUST] Error: {}", message);
+ println!("[RUST] Error: {}", message);
if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
error_messages.push(message);
}
- error!("could not access error messages")
+ panic!("could not access error messages")
}
fn check_error() -> Result<(), String> {
if let Ok(error_messages) = ERROR_MESSAGES.lock() {
if !error_messages.is_empty() {
- return Err(error_messages.join("\n"));
+ return Err(error_messages.join("/"));
}
return Ok(());
} else {
@@ -96,7 +95,7 @@
let outbound = async_stream::stream! {
while let Some(data) = request_thread_rx.recv().await {
println!("[RUST] [transmitter_thread] field index: {}", data.field_index);
- println!("[RUST] [transmitter_thread] data: {:?}", data.data);
+ println!("[RUST] [transmitter_thread] data size: {}", data.data.len());
let request_data: Option<put_request::Data> = match data.field_index {
0 => {
match String::from_utf8(data.data).ok() {
@@ -152,9 +151,6 @@
None
}
};
- if maybe_response.is_none() {
- return;
- }
match maybe_response {
Some(response) => {
let mut inner_response = response.into_inner();
@@ -188,6 +184,9 @@
};
}
}
+ None => {
+ return;
+ }
unexpected => {
report_error(format!("unexpected result received: {:?}", unexpected));
}
@@ -229,6 +228,7 @@
report_error("couldn't access client".to_string());
}
});
+ check_error()?;
response.ok_or("response not received properly".to_string())
}
@@ -241,7 +241,7 @@
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
println!("[RUST] [put_client_process] field index: {}", field_index);
- println!("[RUST] [put_client_process] data string: {:?}", data_bytes);
+ println!("[RUST] [put_client_process] data string size: {}", data_bytes.len());
RUNTIME.block_on(async {
if let Ok(mut client) = CLIENT.lock() {
@@ -256,6 +256,7 @@
} else {
report_error("send data to receiver failed".to_string());
}
+ client.tx = Some(tx);
} else {
report_error("couldn't access client's transmitter".to_string());
}
@@ -270,14 +271,14 @@
// returns vector of error messages
// empty vector indicates that there were no errors
pub fn put_client_terminate_cxx() -> Result<(), String> {
- println!("[RUST] put_client_terminating");
+ println!("[RUST] put_client_terminate_cxx begin");
check_error()?;
if let Ok(mut client) = CLIENT.lock() {
+ if let Some(tx) = client.tx.take() {
+ drop(tx);
+ }
if let Some(receiver_handle) = client.receiver_handle.take() {
- if let Some(tx) = client.tx.take() {
- drop(tx);
- }
RUNTIME.block_on(async {
if receiver_handle.await.is_err() {
report_error("wait for receiver handle failed".to_string());
@@ -292,6 +293,7 @@
!is_initialized(),
"client transmitter handler released properly"
);
+ check_error()?;
println!("[RUST] put_client_terminated");
Ok(())
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -4,6 +4,10 @@
#include "GlobalTools.h"
#include "Tools.h"
+#include "blob_client/src/lib.rs.h"
+
+#include <sstream>
+
namespace comm {
namespace network {
namespace reactor {
@@ -64,12 +68,62 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- // todo:blob perform put:initialize
+ bool dataExists = false;
+ try {
+ put_client_initialize_cxx();
+ put_client_write_cxx(0, this->holder.c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+ put_client_write_cxx(1, this->dataHash.c_str());
+
+ rust::String responseStr =
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably
+ // want to delegate performing ops
+ // to separate threads in the base
+ // reactors
+ const char *value = responseStr.c_str();
+ unsigned int intValue;
+ std::stringstream strValue;
+
+ strValue << value;
+ strValue >> intValue;
+
+ dataExists = (bool)intValue;
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
+ if (dataExists) {
+ return std::make_unique<ServerBidiReactorStatus>(
+ grpc::Status::OK, true);
+ }
return nullptr;
}
case State::DATA_CHUNKS: {
- // todo:blob perform put:add chunk
- // (std::move(*request.mutable_newcompactionchunk())
+ if (request.mutable_newcompactionchunk()->empty()) {
+ return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK);
+ }
+ try {
+ put_client_write_cxx(
+ 2,
+ std::string(std::move(*request.mutable_newcompactionchunk()))
+ .c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
return nullptr;
}
}
@@ -78,8 +132,13 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- // todo:blob perform put:add chunk ("")
- // todo:blob perform put:wait for completion
+ try {
+ put_client_terminate_cxx();
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
// TODO add recovery data
// TODO handle attachments holders

File Metadata

Mime Type
text/plain
Expires
Fri, Dec 27, 5:16 AM (10 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2710651
Default Alt Text
D4975.diff (7 KB)

Event Timeline