Page MenuHomePhabricator

D4884.id15978.diff
No OneTemporary

D4884.id15978.diff

diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,4 @@
# Nix
result*
+services/backup/rust_lib/target/
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/blob_client/src/put_client.rs
@@ -0,0 +1,245 @@
+mod proto {
+ tonic::include_proto!("blob");
+}
+
+use proto::blob_service_client::BlobServiceClient;
+use proto::put_request;
+use proto::put_request::Data::*;
+use proto::PutRequest;
+use proto::PutResponse;
+
+use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
+use lazy_static::lazy_static;
+use std::sync::{Arc, Mutex};
+use tokio::runtime::Runtime;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+use libc;
+use libc::c_char;
+use std::ffi::CStr;
+
+#[derive(Debug)]
+struct PutRequestData {
+ field_index: usize,
+ data: String,
+}
+
+struct BidiClient {
+ tx: Option<mpsc::Sender<PutRequestData>>,
+
+ rx: Option<mpsc::Receiver<String>>,
+ receiver_handle: Option<JoinHandle<()>>,
+}
+
+lazy_static! {
+ static ref CLIENT: Arc<Mutex<BidiClient>> =
+ Arc::new(Mutex::new(BidiClient {
+ tx: None,
+ rx: None,
+ receiver_handle: None,
+ }));
+ static ref RUNTIME: Runtime = Runtime::new().unwrap();
+ static ref ERROR_MESSAGES: Arc<Mutex<Vec<String>>> =
+ Arc::new(Mutex::new(Vec::new()));
+}
+
+fn is_initialized() -> bool {
+ if CLIENT.lock().expect("access client").tx.is_none() {
+ return false;
+ }
+ if CLIENT.lock().expect("access client").rx.is_none() {
+ return false;
+ }
+ if CLIENT
+ .lock()
+ .expect("access client")
+ .receiver_handle
+ .is_none()
+ {
+ return false;
+ }
+ return true;
+}
+
+fn report_error(message: String) {
+ ERROR_MESSAGES
+ .lock()
+ .expect("access error messages")
+ .push(message);
+}
+
+fn check_error() -> Result<(), String> {
+ let errors = ERROR_MESSAGES.lock().expect("access error messages");
+ let mut errors_str_value = None;
+ if !errors.is_empty() {
+ errors_str_value = Some(errors.join("\n"));
+ }
+ return match errors_str_value {
+ Some(value) => Err(value),
+ None => Ok(()),
+ };
+}
+
+pub fn put_client_initialize_cxx() -> Result<(), String> {
+ println!("[RUST] initializing");
+ assert!(!is_initialized(), "client cannot be initialized twice");
+ // grpc
+ let mut grpc_client: Option<BlobServiceClient<tonic::transport::Channel>> =
+ None;
+ RUNTIME.block_on(async {
+ grpc_client = Some(
+ BlobServiceClient::connect(BLOB_ADDRESS)
+ .await
+ .expect("successfully connect to the blob server"),
+ );
+ });
+
+ let (request_thread_tx, mut request_thread_rx): (
+ mpsc::Sender<PutRequestData>,
+ mpsc::Receiver<PutRequestData>,
+ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+
+ let outbound = async_stream::stream! {
+ while let Some(data) = request_thread_rx.recv().await {
+ println!("[RUST] [transmitter_thread] field index: {}", data.field_index);
+ println!("[RUST] [transmitter_thread] data: {}", data.data);
+ let request_data: put_request::Data = match data.field_index {
+ 0 => Holder(data.data),
+ 1 => BlobHash(data.data),
+ 2 => DataChunk(data.data.into_bytes()),
+ _ => panic!("invalid field index value {}", data.field_index)
+ };
+ let request = PutRequest {
+ data: Some(request_data),
+ };
+ yield request;
+ }
+ };
+
+ // spawn receiver thread
+ let (response_thread_tx, response_thread_rx): (
+ mpsc::Sender<String>,
+ mpsc::Receiver<String>,
+ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let receiver_handle = RUNTIME.spawn(async move {
+ println!("[RUST] [receiver_thread] begin");
+ let response: Option<
+ tonic::Response<tonic::codec::Streaming<PutResponse>>,
+ > = match grpc_client
+ .expect("access grpc client")
+ .put(tonic::Request::new(outbound))
+ .await
+ {
+ Ok(res) => Some(res),
+ Err(err) => {
+ report_error(err.to_string());
+ println!("ERROR!! {}", err.to_string());
+ None
+ }
+ };
+ if response.is_none() {
+ return;
+ }
+ let mut inbound = response.unwrap().into_inner();
+ loop {
+ let response: Option<PutResponse> = match inbound.message().await {
+ Ok(res) => res,
+ Err(err) => {
+ report_error(err.to_string());
+ println!("ERROR!! {}", err.to_string());
+ None
+ }
+ };
+ if response.is_none() {
+ break;
+ }
+ let response: PutResponse = response.unwrap();
+ println!("[RUST] got response: {}", response.data_exists);
+ // warning: this will hang if there's more unread responses than MPSC_CHANNEL_BUFFER_CAPACITY
+ // you should then use put_client_blocking_read_cxx in order to dequeue the responses in c++ and make room for more
+ response_thread_tx
+ .send((response.data_exists as i32).to_string())
+ .await
+ .unwrap();
+ }
+ println!("[RUST] [receiver_thread] done");
+ });
+
+ CLIENT.lock().expect("access client").tx = Some(request_thread_tx);
+ CLIENT.lock().expect("access client").receiver_handle = Some(receiver_handle);
+ CLIENT.lock().expect("access client").rx = Some(response_thread_rx);
+ println!("[RUST] initialized");
+ Ok(())
+}
+
+pub fn put_client_blocking_read_cxx() -> Result<String, String> {
+ let mut response: Option<String> = None;
+ check_error()?;
+ RUNTIME.block_on(async {
+ let mut rx: mpsc::Receiver<String> = CLIENT
+ .lock()
+ .expect("access client")
+ .rx
+ .take()
+ .expect("access client's receiver");
+ if let Some(data) = rx.recv().await {
+ println!("received data {}", data);
+ response = Some(data);
+ }
+ CLIENT.lock().expect("access client").rx = Some(rx);
+ });
+ if response.is_none() {
+ return Err("response not received properly".to_string());
+ }
+ Ok(response.unwrap())
+}
+
+pub fn put_client_write_cxx(
+ field_index: usize,
+ data: *const c_char,
+) -> Result<(), String> {
+ println!("[RUST] [put_client_process] begin");
+ check_error()?;
+ let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
+ let data_str: String = data_c_str.to_str().unwrap().to_owned();
+ println!("[RUST] [put_client_process] field index: {}", field_index);
+ println!("[RUST] [put_client_process] data string: {}", data_str);
+
+ RUNTIME.block_on(async {
+ CLIENT
+ .lock()
+ .expect("access client")
+ .tx
+ .as_ref()
+ .expect("access client's transmitter")
+ .send(str)
+ .await
+ .expect("send data to receiver");
+ });
+ println!("[RUST] [put_client_process] end");
+ Ok(())
+}
+
+// returns vector of error messages
+// empty vector indicates that there were no errors
+pub fn put_client_terminate_cxx() -> Result<(), String> {
+ println!("[RUST] put_client_terminating");
+ check_error()?;
+ let receiver_handle = CLIENT
+ .lock()
+ .expect("access client")
+ .receiver_handle
+ .take()
+ .unwrap();
+ drop(CLIENT.lock().expect("access client").tx.take().unwrap());
+ RUNTIME.block_on(async {
+ receiver_handle.await.unwrap();
+ });
+
+ assert!(
+ !is_initialized(),
+ "client transmitter handler released properly"
+ );
+ println!("[RUST] put_client_terminated");
+ Ok(())
+}
diff --git a/services/backup/rust_lib/Cargo.lock b/services/backup/rust_lib/Cargo.lock
--- a/services/backup/rust_lib/Cargo.lock
+++ b/services/backup/rust_lib/Cargo.lock
@@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
+[[package]]
+name = "autocfg"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+
[[package]]
name = "cc"
version = "1.0.73"
@@ -62,6 +68,33 @@
"syn",
]
+[[package]]
+name = "futures-core"
+version = "0.3.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115"
+
+[[package]]
+name = "hermit-abi"
+version = "0.1.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "lazy_static"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+
+[[package]]
+name = "libc"
+version = "0.2.132"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
+
[[package]]
name = "link-cplusplus"
version = "1.0.6"
@@ -71,12 +104,28 @@
"cc",
]
+[[package]]
+name = "num_cpus"
+version = "1.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
+dependencies = [
+ "hermit-abi",
+ "libc",
+]
+
[[package]]
name = "once_cell"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1"
+[[package]]
+name = "pin-project-lite"
+version = "0.2.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
+
[[package]]
name = "proc-macro2"
version = "1.0.43"
@@ -101,6 +150,10 @@
dependencies = [
"cxx",
"cxx-build",
+ "lazy_static",
+ "libc",
+ "tokio",
+ "tokio-stream",
]
[[package]]
@@ -129,6 +182,41 @@
"winapi-util",
]
+[[package]]
+name = "tokio"
+version = "1.20.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581"
+dependencies = [
+ "autocfg",
+ "num_cpus",
+ "once_cell",
+ "pin-project-lite",
+ "tokio-macros",
+]
+
+[[package]]
+name = "tokio-macros"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "tokio-stream"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "unicode-ident"
version = "1.0.3"
diff --git a/services/backup/rust_lib/Cargo.toml b/services/backup/rust_lib/Cargo.toml
--- a/services/backup/rust_lib/Cargo.toml
+++ b/services/backup/rust_lib/Cargo.toml
@@ -5,6 +5,10 @@
[dependencies]
cxx = "1.0"
+tokio = { version = "1.20", features = ["macros", "rt-multi-thread"] }
+tokio-stream = "0.1"
+lazy_static = "1.4"
+libc = "0.2"
[build-dependencies]
cxx-build = "1.0"
diff --git a/services/backup/rust_lib/src/constants.rs b/services/backup/rust_lib/src/constants.rs
new file mode 100644
--- /dev/null
+++ b/services/backup/rust_lib/src/constants.rs
@@ -0,0 +1 @@
+pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
diff --git a/services/backup/rust_lib/src/lib.rs b/services/backup/rust_lib/src/lib.rs
--- a/services/backup/rust_lib/src/lib.rs
+++ b/services/backup/rust_lib/src/lib.rs
@@ -1,10 +1,52 @@
#[cxx::bridge]
mod ffi {
extern "Rust" {
- fn test_function() -> i32;
+ fn rust_is_initialized_cxx() -> bool;
+ fn rust_initialize_cxx() -> ();
+ unsafe fn rust_process_cxx(_: *const c_char) -> ();
+ fn rust_terminate_cxx() -> ();
}
}
-pub fn test_function() -> i32 {
- 0
+use lazy_static::lazy_static;
+use libc;
+use libc::c_char;
+use std::sync::{Arc, Mutex};
+use tokio::runtime::Runtime;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+pub struct Client {
+ tx: Option<mpsc::Sender<String>>,
+ handle: Option<JoinHandle<()>>,
+}
+
+lazy_static! {
+ pub static ref CLIENT: Arc<Mutex<Client>> = Arc::new(Mutex::new(Client {
+ tx: None,
+ handle: None
+ }));
+ pub static ref RUNTIME: Runtime = Runtime::new().unwrap();
+}
+
+pub fn rust_is_initialized_cxx() -> bool {
+ if CLIENT.lock().expect("access client").tx.is_none() {
+ return false;
+ }
+ if CLIENT.lock().expect("access client").handle.is_none() {
+ return false;
+ }
+ return true;
+}
+
+pub fn rust_initialize_cxx() -> () {
+ unimplemented!();
+}
+
+pub fn rust_process_cxx(_: *const c_char) -> () {
+ unimplemented!();
+}
+
+pub fn rust_terminate_cxx() -> () {
+ unimplemented!();
}

File Metadata

Mime Type
text/plain
Expires
Sun, Nov 17, 1:17 AM (19 h, 18 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2509699
Default Alt Text
D4884.id15978.diff (12 KB)

Event Timeline