Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3540471
D4975.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Referenced Files
None
Subscribers
None
D4975.diff
View Options
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -17,7 +17,6 @@
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use tracing::error;
#[derive(Debug)]
struct PutRequestData {
@@ -59,17 +58,17 @@
}
fn report_error(message: String) {
- error!("[RUST] Error: {}", message);
+ println!("[RUST] Error: {}", message);
if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
error_messages.push(message);
}
- error!("could not access error messages")
+ panic!("could not access error messages")
}
fn check_error() -> Result<(), String> {
if let Ok(error_messages) = ERROR_MESSAGES.lock() {
if !error_messages.is_empty() {
- return Err(error_messages.join("\n"));
+ return Err(error_messages.join("/"));
}
return Ok(());
} else {
@@ -96,7 +95,7 @@
let outbound = async_stream::stream! {
while let Some(data) = request_thread_rx.recv().await {
println!("[RUST] [transmitter_thread] field index: {}", data.field_index);
- println!("[RUST] [transmitter_thread] data: {:?}", data.data);
+ println!("[RUST] [transmitter_thread] data size: {}", data.data.len());
let request_data: Option<put_request::Data> = match data.field_index {
0 => {
match String::from_utf8(data.data).ok() {
@@ -152,9 +151,6 @@
None
}
};
- if maybe_response.is_none() {
- return;
- }
match maybe_response {
Some(response) => {
let mut inner_response = response.into_inner();
@@ -188,6 +184,9 @@
};
}
}
+ None => {
+ return;
+ }
unexpected => {
report_error(format!("unexpected result received: {:?}", unexpected));
}
@@ -229,6 +228,7 @@
report_error("couldn't access client".to_string());
}
});
+ check_error()?;
response.ok_or("response not received properly".to_string())
}
@@ -241,7 +241,7 @@
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
println!("[RUST] [put_client_process] field index: {}", field_index);
- println!("[RUST] [put_client_process] data string: {:?}", data_bytes);
+ println!("[RUST] [put_client_process] data string size: {}", data_bytes.len());
RUNTIME.block_on(async {
if let Ok(mut client) = CLIENT.lock() {
@@ -256,6 +256,7 @@
} else {
report_error("send data to receiver failed".to_string());
}
+ client.tx = Some(tx);
} else {
report_error("couldn't access client's transmitter".to_string());
}
@@ -270,14 +271,14 @@
// returns vector of error messages
// empty vector indicates that there were no errors
pub fn put_client_terminate_cxx() -> Result<(), String> {
- println!("[RUST] put_client_terminating");
+ println!("[RUST] put_client_terminate_cxx begin");
check_error()?;
if let Ok(mut client) = CLIENT.lock() {
+ if let Some(tx) = client.tx.take() {
+ drop(tx);
+ }
if let Some(receiver_handle) = client.receiver_handle.take() {
- if let Some(tx) = client.tx.take() {
- drop(tx);
- }
RUNTIME.block_on(async {
if receiver_handle.await.is_err() {
report_error("wait for receiver handle failed".to_string());
@@ -292,6 +293,7 @@
!is_initialized(),
"client transmitter handler released properly"
);
+ check_error()?;
println!("[RUST] put_client_terminated");
Ok(())
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -4,6 +4,10 @@
#include "GlobalTools.h"
#include "Tools.h"
+#include "blob_client/src/lib.rs.h"
+
+#include <sstream>
+
namespace comm {
namespace network {
namespace reactor {
@@ -64,12 +68,62 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- // todo:blob perform put:initialize
+ bool dataExists = false;
+ try {
+ put_client_initialize_cxx();
+ put_client_write_cxx(0, this->holder.c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+ put_client_write_cxx(1, this->dataHash.c_str());
+
+ rust::String responseStr =
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably
+ // want to delegate performing ops
+ // to separate threads in the base
+ // reactors
+ const char *value = responseStr.c_str();
+ unsigned int intValue;
+ std::stringstream strValue;
+
+ strValue << value;
+ strValue >> intValue;
+
+ dataExists = (bool)intValue;
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
+ if (dataExists) {
+ return std::make_unique<ServerBidiReactorStatus>(
+ grpc::Status::OK, true);
+ }
return nullptr;
}
case State::DATA_CHUNKS: {
- // todo:blob perform put:add chunk
- // (std::move(*request.mutable_newcompactionchunk())
+ if (request.mutable_newcompactionchunk()->empty()) {
+ return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK);
+ }
+ try {
+ put_client_write_cxx(
+ 2,
+ std::string(std::move(*request.mutable_newcompactionchunk()))
+ .c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
return nullptr;
}
}
@@ -78,8 +132,13 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- // todo:blob perform put:add chunk ("")
- // todo:blob perform put:wait for completion
+ try {
+ put_client_terminate_cxx();
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
// TODO add recovery data
// TODO handle attachments holders
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Dec 27, 5:16 AM (10 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2710651
Default Alt Text
D4975.diff (7 KB)
Attached To
Mode
D4975: [services] Backup - Connect to Blob - Implement CreateNewBackup
Attached
Detach File
Event Timeline
Log In to Comment