diff --git a/services/backup/blob_client/src/constants.rs b/services/backup/blob_client/src/constants.rs --- a/services/backup/blob_client/src/constants.rs +++ b/services/backup/blob_client/src/constants.rs @@ -1 +1,2 @@ pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; +pub const BLOB_ADDRESS: &str = "blob-server:50051"; diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -3,14 +3,16 @@ mod put_client; use put_client::{ - put_client_initialize_cxx, put_client_send_cxx, put_client_terminate_cxx, + put_client_blocking_read_cxx, put_client_initialize_cxx, + put_client_terminate_cxx, put_client_write_cxx, }; #[cxx::bridge] mod ffi { extern "Rust" { fn put_client_initialize_cxx() -> (); - unsafe fn put_client_send_cxx(data: *const c_char) -> (); + unsafe fn put_client_write_cxx(data: *const c_char) -> (); + fn put_client_blocking_read_cxx() -> (); fn put_client_terminate_cxx() -> (); } } diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -1,4 +1,4 @@ -use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; +use crate::constants::{MPSC_CHANNEL_BUFFER_CAPACITY}; use lazy_static::lazy_static; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; @@ -8,16 +8,22 @@ use libc::c_char; use std::ffi::CStr; -struct Client { +struct BidiClient { tx: Option>, - handle: Option>, + transmitter_handle: Option>, + + rx: Option>, + receiver_handle: Option>, } lazy_static! { - static ref CLIENT: Arc> = Arc::new(Mutex::new(Client { - tx: None, - handle: None - })); + static ref CLIENT: Arc> = + Arc::new(Mutex::new(BidiClient { + tx: None, + transmitter_handle: None, + rx: None, + receiver_handle: None, + })); static ref RUNTIME: Runtime = Runtime::new().unwrap(); } @@ -25,7 +31,12 @@ if CLIENT.lock().expect("access client").tx.is_none() { return false; } - if CLIENT.lock().expect("access client").handle.is_none() { + if CLIENT + .lock() + .expect("access client") + .transmitter_handle + .is_none() + { return false; } return true; @@ -34,28 +45,59 @@ pub fn put_client_initialize_cxx() -> () { println!("[RUST] initializing"); assert!(!is_initialized(), "client cannot be initialized twice"); - let (tx, mut rx): (mpsc::Sender, mpsc::Receiver) = - mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let handle = RUNTIME.spawn(async move { - println!("[RUST] [receiver] begin"); - while let Some(data) = rx.recv().await { - println!("[RUST] [receiver] data: {}", data); + // spawn transmitter thread + let (transmitter_thread_tx, mut transmitter_thread_rx): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let transmitter_handle = RUNTIME.spawn(async move { + println!("[RUST] [transmitter_thread] begin"); + while let Some(data) = transmitter_thread_rx.recv().await { + println!("[RUST] [transmitter_thread] data: {}", data); // todo: send throug grpc here } - println!("[RUST] [receiver] done"); + println!("[RUST] [transmitter_thread] done"); + }); + // spawn receiver thread + let (receiver_thread_tx, mut receiver_thread_rx): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let receiver_handle = RUNTIME.spawn(async move { + println!("[RUST] [receiver_thread] begin"); + // here get responses from grpc and send them with the receiver_thread_tx + println!("[RUST] [receiver_thread] done"); }); - CLIENT.lock().expect("access client").handle = Some(handle); - CLIENT.lock().expect("access client").tx = Some(tx); + + CLIENT.lock().expect("access client").transmitter_handle = + Some(transmitter_handle); + CLIENT.lock().expect("access client").tx = Some(transmitter_thread_tx); + CLIENT.lock().expect("access client").receiver_handle = Some(receiver_handle); + CLIENT.lock().expect("access client").rx = Some(receiver_thread_rx); println!("[RUST] initialized"); } -pub fn put_client_send_cxx(data: *const c_char) -> () { +pub fn put_client_blocking_read_cxx() -> () { + RUNTIME.block_on(async { + let mut rx: mpsc::Receiver = CLIENT + .lock() + .expect("access client") + .rx + .take() + .expect("access client's receiver"); + if let Some(data) = rx.recv().await { + println!("received data {}", data); + } + CLIENT.lock().expect("access client").rx = Some(rx); + }); +} + +pub fn put_client_write_cxx(data: *const c_char) -> () { println!("[RUST] [put_client_process] begin"); let c_str: &CStr = unsafe { CStr::from_ptr(data) }; let str: String = c_str.to_str().unwrap().to_owned(); println!("[RUST] [put_client_process] data string: {}", str); - // this works RUNTIME.block_on(async { CLIENT .lock() @@ -72,13 +114,28 @@ pub fn put_client_terminate_cxx() -> () { println!("[RUST] put_client_terminating"); - let handle = CLIENT.lock().expect("access client").handle.take().unwrap(); + let transmitter_handle = CLIENT + .lock() + .expect("access client") + .transmitter_handle + .take() + .unwrap(); + let receiver_handle = CLIENT + .lock() + .expect("access client") + .receiver_handle + .take() + .unwrap(); drop(CLIENT.lock().expect("access client").tx.take().unwrap()); RUNTIME.block_on(async { - handle.await.unwrap(); + transmitter_handle.await.unwrap(); + receiver_handle.await.unwrap(); }); - assert!(!is_initialized(), "client handler released properly"); + assert!( + !is_initialized(), + "client transmitter handler released properly" + ); println!("[RUST] put_client_terminated"); }