diff --git a/services/backup/blob_client/Cargo.lock b/services/backup/blob_client/Cargo.lock --- a/services/backup/blob_client/Cargo.lock +++ b/services/backup/blob_client/Cargo.lock @@ -114,6 +114,7 @@ name = "blob_client" version = "0.1.0" dependencies = [ + "async-stream", "cxx", "cxx-build", "lazy_static", diff --git a/services/backup/blob_client/Cargo.toml b/services/backup/blob_client/Cargo.toml --- a/services/backup/blob_client/Cargo.toml +++ b/services/backup/blob_client/Cargo.toml @@ -12,6 +12,7 @@ tonic = "0.8" prost = "0.11" tracing = "0.1" +async-stream = "0.3.2" [build-dependencies] cxx-build = "1.0" diff --git a/services/backup/blob_client/src/constants.rs b/services/backup/blob_client/src/constants.rs --- a/services/backup/blob_client/src/constants.rs +++ b/services/backup/blob_client/src/constants.rs @@ -1,2 +1,2 @@ pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; -pub const BLOB_ADDRESS: &str = "blob-server:50051"; +pub const BLOB_ADDRESS: &str = "http://blob-server:50051"; diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -1,4 +1,14 @@ -use crate::constants::{MPSC_CHANNEL_BUFFER_CAPACITY}; +mod proto { + tonic::include_proto!("blob"); +} + +use proto::blob_service_client::BlobServiceClient; +use proto::put_request; +use proto::put_request::Data::*; +use proto::PutRequest; +use proto::PutResponse; + +use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; use lazy_static::lazy_static; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; @@ -8,9 +18,14 @@ use libc::c_char; use std::ffi::CStr; +#[derive(Debug)] +struct PutRequestData { + field_index: usize, + data: String, +} + struct BidiClient { - tx: Option>, - transmitter_handle: Option>, + tx: Option>, rx: Option>, receiver_handle: Option>, @@ -20,7 +35,6 @@ static ref CLIENT: Arc> = Arc::new(Mutex::new(BidiClient { tx: None, - transmitter_handle: None, rx: None, receiver_handle: None, })); @@ -33,10 +47,13 @@ if CLIENT.lock().expect("access client").tx.is_none() { return false; } + if CLIENT.lock().expect("access client").rx.is_none() { + return false; + } if CLIENT .lock() .expect("access client") - .transmitter_handle + .receiver_handle .is_none() { return false; @@ -66,35 +83,91 @@ pub fn put_client_initialize_cxx() -> Result<(), String> { println!("[RUST] initializing"); assert!(!is_initialized(), "client cannot be initialized twice"); - // spawn transmitter thread - let (transmitter_thread_tx, mut transmitter_thread_rx): ( - mpsc::Sender, - mpsc::Receiver, + // grpc + let mut grpc_client: Option> = + None; + RUNTIME.block_on(async { + grpc_client = Some( + BlobServiceClient::connect(BLOB_ADDRESS) + .await + .expect("successfully connect to the blob server"), + ); + }); + + let (request_thread_tx, mut request_thread_rx): ( + mpsc::Sender, + mpsc::Receiver, ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let transmitter_handle = RUNTIME.spawn(async move { - println!("[RUST] [transmitter_thread] begin"); - while let Some(data) = transmitter_thread_rx.recv().await { - println!("[RUST] [transmitter_thread] data: {}", data); - // todo: send throug grpc here + + let outbound = async_stream::stream! { + while let Some(data) = request_thread_rx.recv().await { + println!("[RUST] [transmitter_thread] field index: {}", data.field_index); + println!("[RUST] [transmitter_thread] data: {}", data.data); + let request_data: put_request::Data = match data.field_index { + 0 => Holder(data.data), + 1 => BlobHash(data.data), + 2 => DataChunk(data.data.into_bytes()), + _ => panic!("invalid field index value {}", data.field_index) + }; + let request = PutRequest { + data: Some(request_data), + }; + yield request; } - println!("[RUST] [transmitter_thread] done"); - }); + }; + // spawn receiver thread - let (receiver_thread_tx, mut receiver_thread_rx): ( + let (response_thread_tx, response_thread_rx): ( mpsc::Sender, mpsc::Receiver, ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let receiver_handle = RUNTIME.spawn(async move { println!("[RUST] [receiver_thread] begin"); - // here get responses from grpc and send them with the receiver_thread_tx + let response: Option< + tonic::Response>, + > = match grpc_client + .expect("access grpc client") + .put(tonic::Request::new(outbound)) + .await + { + Ok(res) => Some(res), + Err(err) => { + report_error(err.to_string()); + println!("ERROR!! {}", err.to_string()); + None + } + }; + if response.is_none() { + return; + } + let mut inbound = response.unwrap().into_inner(); + loop { + let response: Option = match inbound.message().await { + Ok(res) => res, + Err(err) => { + report_error(err.to_string()); + println!("ERROR!! {}", err.to_string()); + None + } + }; + if response.is_none() { + break; + } + let response: PutResponse = response.unwrap(); + println!("[RUST] got response: {}", response.data_exists); + // warning: this will hang if there's more unread responses than MPSC_CHANNEL_BUFFER_CAPACITY + // you should then use put_client_blocking_read_cxx in order to dequeue the responses in c++ and make room for more + response_thread_tx + .send((response.data_exists as i32).to_string()) + .await + .unwrap(); + } println!("[RUST] [receiver_thread] done"); }); - CLIENT.lock().expect("access client").transmitter_handle = - Some(transmitter_handle); - CLIENT.lock().expect("access client").tx = Some(transmitter_thread_tx); + CLIENT.lock().expect("access client").tx = Some(request_thread_tx); CLIENT.lock().expect("access client").receiver_handle = Some(receiver_handle); - CLIENT.lock().expect("access client").rx = Some(receiver_thread_rx); + CLIENT.lock().expect("access client").rx = Some(response_thread_rx); println!("[RUST] initialized"); Ok(()) } @@ -127,9 +200,10 @@ ) -> Result<(), String> { println!("[RUST] [put_client_process] begin"); check_error()?; - let c_str: &CStr = unsafe { CStr::from_ptr(data) }; - let str: String = c_str.to_str().unwrap().to_owned(); - println!("[RUST] [put_client_process] data string: {}", str); + let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; + let data_str: String = data_c_str.to_str().unwrap().to_owned(); + println!("[RUST] [put_client_process] field index: {}", field_index); + println!("[RUST] [put_client_process] data string: {}", data_str); RUNTIME.block_on(async { CLIENT @@ -138,7 +212,10 @@ .tx .as_ref() .expect("access client's transmitter") - .send(str) + .send(PutRequestData { + data: data_str, + field_index, + }) .await .expect("send data to receiver"); }); @@ -146,24 +223,19 @@ Ok(()) } +// returns vector of error messages +// empty vector indicates that there were no errors pub fn put_client_terminate_cxx() -> Result<(), String> { println!("[RUST] put_client_terminating"); - let transmitter_handle = CLIENT - .lock() - .expect("access client") - .transmitter_handle - .take() - .unwrap(); + check_error()?; let receiver_handle = CLIENT .lock() .expect("access client") .receiver_handle .take() .unwrap(); - drop(CLIENT.lock().expect("access client").tx.take().unwrap()); RUNTIME.block_on(async { - transmitter_handle.await.unwrap(); receiver_handle.await.unwrap(); });