diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ # Nix result* +services/backup/rust_lib/target/ diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs new file mode 100644 --- /dev/null +++ b/services/backup/blob_client/src/put_client.rs @@ -0,0 +1,245 @@ +mod proto { + tonic::include_proto!("blob"); +} + +use proto::blob_service_client::BlobServiceClient; +use proto::put_request; +use proto::put_request::Data::*; +use proto::PutRequest; +use proto::PutResponse; + +use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; +use lazy_static::lazy_static; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use libc; +use libc::c_char; +use std::ffi::CStr; + +#[derive(Debug)] +struct PutRequestData { + field_index: usize, + data: String, +} + +struct BidiClient { + tx: Option>, + + rx: Option>, + receiver_handle: Option>, +} + +lazy_static! { + static ref CLIENT: Arc> = + Arc::new(Mutex::new(BidiClient { + tx: None, + rx: None, + receiver_handle: None, + })); + static ref RUNTIME: Runtime = Runtime::new().unwrap(); + static ref ERROR_MESSAGES: Arc>> = + Arc::new(Mutex::new(Vec::new())); +} + +fn is_initialized() -> bool { + if CLIENT.lock().expect("access client").tx.is_none() { + return false; + } + if CLIENT.lock().expect("access client").rx.is_none() { + return false; + } + if CLIENT + .lock() + .expect("access client") + .receiver_handle + .is_none() + { + return false; + } + return true; +} + +fn report_error(message: String) { + ERROR_MESSAGES + .lock() + .expect("access error messages") + .push(message); +} + +fn check_error() -> Result<(), String> { + let errors = ERROR_MESSAGES.lock().expect("access error messages"); + let mut errors_str_value = None; + if !errors.is_empty() { + errors_str_value = Some(errors.join("\n")); + } + return match errors_str_value { + Some(value) => Err(value), + None => Ok(()), + }; +} + +pub fn put_client_initialize_cxx() -> Result<(), String> { + println!("[RUST] initializing"); + assert!(!is_initialized(), "client cannot be initialized twice"); + // grpc + let mut grpc_client: Option> = + None; + RUNTIME.block_on(async { + grpc_client = Some( + BlobServiceClient::connect(BLOB_ADDRESS) + .await + .expect("successfully connect to the blob server"), + ); + }); + + let (request_thread_tx, mut request_thread_rx): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + + let outbound = async_stream::stream! { + while let Some(data) = request_thread_rx.recv().await { + println!("[RUST] [transmitter_thread] field index: {}", data.field_index); + println!("[RUST] [transmitter_thread] data: {}", data.data); + let request_data: put_request::Data = match data.field_index { + 0 => Holder(data.data), + 1 => BlobHash(data.data), + 2 => DataChunk(data.data.into_bytes()), + _ => panic!("invalid field index value {}", data.field_index) + }; + let request = PutRequest { + data: Some(request_data), + }; + yield request; + } + }; + + // spawn receiver thread + let (response_thread_tx, response_thread_rx): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let receiver_handle = RUNTIME.spawn(async move { + println!("[RUST] [receiver_thread] begin"); + let response: Option< + tonic::Response>, + > = match grpc_client + .expect("access grpc client") + .put(tonic::Request::new(outbound)) + .await + { + Ok(res) => Some(res), + Err(err) => { + report_error(err.to_string()); + println!("ERROR!! {}", err.to_string()); + None + } + }; + if response.is_none() { + return; + } + let mut inbound = response.unwrap().into_inner(); + loop { + let response: Option = match inbound.message().await { + Ok(res) => res, + Err(err) => { + report_error(err.to_string()); + println!("ERROR!! {}", err.to_string()); + None + } + }; + if response.is_none() { + break; + } + let response: PutResponse = response.unwrap(); + println!("[RUST] got response: {}", response.data_exists); + // warning: this will hang if there's more unread responses than MPSC_CHANNEL_BUFFER_CAPACITY + // you should then use put_client_blocking_read_cxx in order to dequeue the responses in c++ and make room for more + response_thread_tx + .send((response.data_exists as i32).to_string()) + .await + .unwrap(); + } + println!("[RUST] [receiver_thread] done"); + }); + + CLIENT.lock().expect("access client").tx = Some(request_thread_tx); + CLIENT.lock().expect("access client").receiver_handle = Some(receiver_handle); + CLIENT.lock().expect("access client").rx = Some(response_thread_rx); + println!("[RUST] initialized"); + Ok(()) +} + +pub fn put_client_blocking_read_cxx() -> Result { + let mut response: Option = None; + check_error()?; + RUNTIME.block_on(async { + let mut rx: mpsc::Receiver = CLIENT + .lock() + .expect("access client") + .rx + .take() + .expect("access client's receiver"); + if let Some(data) = rx.recv().await { + println!("received data {}", data); + response = Some(data); + } + CLIENT.lock().expect("access client").rx = Some(rx); + }); + if response.is_none() { + return Err("response not received properly".to_string()); + } + Ok(response.unwrap()) +} + +pub fn put_client_write_cxx( + field_index: usize, + data: *const c_char, +) -> Result<(), String> { + println!("[RUST] [put_client_process] begin"); + check_error()?; + let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; + let data_str: String = data_c_str.to_str().unwrap().to_owned(); + println!("[RUST] [put_client_process] field index: {}", field_index); + println!("[RUST] [put_client_process] data string: {}", data_str); + + RUNTIME.block_on(async { + CLIENT + .lock() + .expect("access client") + .tx + .as_ref() + .expect("access client's transmitter") + .send(str) + .await + .expect("send data to receiver"); + }); + println!("[RUST] [put_client_process] end"); + Ok(()) +} + +// returns vector of error messages +// empty vector indicates that there were no errors +pub fn put_client_terminate_cxx() -> Result<(), String> { + println!("[RUST] put_client_terminating"); + check_error()?; + let receiver_handle = CLIENT + .lock() + .expect("access client") + .receiver_handle + .take() + .unwrap(); + drop(CLIENT.lock().expect("access client").tx.take().unwrap()); + RUNTIME.block_on(async { + receiver_handle.await.unwrap(); + }); + + assert!( + !is_initialized(), + "client transmitter handler released properly" + ); + println!("[RUST] put_client_terminated"); + Ok(()) +} diff --git a/services/backup/rust_lib/Cargo.lock b/services/backup/rust_lib/Cargo.lock --- a/services/backup/rust_lib/Cargo.lock +++ b/services/backup/rust_lib/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + [[package]] name = "cc" version = "1.0.73" @@ -62,6 +68,33 @@ "syn", ] +[[package]] +name = "futures-core" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" + [[package]] name = "link-cplusplus" version = "1.0.6" @@ -71,12 +104,28 @@ "cc", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "proc-macro2" version = "1.0.43" @@ -101,6 +150,10 @@ dependencies = [ "cxx", "cxx-build", + "lazy_static", + "libc", + "tokio", + "tokio-stream", ] [[package]] @@ -129,6 +182,41 @@ "winapi-util", ] +[[package]] +name = "tokio" +version = "1.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" +dependencies = [ + "autocfg", + "num_cpus", + "once_cell", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.3" diff --git a/services/backup/rust_lib/Cargo.toml b/services/backup/rust_lib/Cargo.toml --- a/services/backup/rust_lib/Cargo.toml +++ b/services/backup/rust_lib/Cargo.toml @@ -5,6 +5,10 @@ [dependencies] cxx = "1.0" +tokio = { version = "1.20", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1" +lazy_static = "1.4" +libc = "0.2" [build-dependencies] cxx-build = "1.0" diff --git a/services/backup/rust_lib/src/constants.rs b/services/backup/rust_lib/src/constants.rs new file mode 100644 --- /dev/null +++ b/services/backup/rust_lib/src/constants.rs @@ -0,0 +1 @@ +pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; diff --git a/services/backup/rust_lib/src/lib.rs b/services/backup/rust_lib/src/lib.rs --- a/services/backup/rust_lib/src/lib.rs +++ b/services/backup/rust_lib/src/lib.rs @@ -1,10 +1,52 @@ #[cxx::bridge] mod ffi { extern "Rust" { - fn test_function() -> i32; + fn rust_is_initialized_cxx() -> bool; + fn rust_initialize_cxx() -> (); + unsafe fn rust_process_cxx(_: *const c_char) -> (); + fn rust_terminate_cxx() -> (); } } -pub fn test_function() -> i32 { - 0 +use lazy_static::lazy_static; +use libc; +use libc::c_char; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +pub struct Client { + tx: Option>, + handle: Option>, +} + +lazy_static! { + pub static ref CLIENT: Arc> = Arc::new(Mutex::new(Client { + tx: None, + handle: None + })); + pub static ref RUNTIME: Runtime = Runtime::new().unwrap(); +} + +pub fn rust_is_initialized_cxx() -> bool { + if CLIENT.lock().expect("access client").tx.is_none() { + return false; + } + if CLIENT.lock().expect("access client").handle.is_none() { + return false; + } + return true; +} + +pub fn rust_initialize_cxx() -> () { + unimplemented!(); +} + +pub fn rust_process_cxx(_: *const c_char) -> () { + unimplemented!(); +} + +pub fn rust_terminate_cxx() -> () { + unimplemented!(); }