Page MenuHomePhabricator

D4942.id15919.diff
No OneTemporary

D4942.id15919.diff

diff --git a/services/backup/blob_client/src/constants.rs b/services/backup/blob_client/src/constants.rs
--- a/services/backup/blob_client/src/constants.rs
+++ b/services/backup/blob_client/src/constants.rs
@@ -1 +1,2 @@
pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
+pub const BLOB_ADDRESS: &str = "blob-server:50051";
diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -3,14 +3,16 @@
mod put_client;
use put_client::{
- put_client_initialize_cxx, put_client_send_cxx, put_client_terminate_cxx,
+ put_client_blocking_read_cxx, put_client_initialize_cxx,
+ put_client_terminate_cxx, put_client_write_cxx,
};
#[cxx::bridge]
mod ffi {
extern "Rust" {
fn put_client_initialize_cxx() -> ();
- unsafe fn put_client_send_cxx(data: *const c_char) -> ();
+ unsafe fn put_client_write_cxx(data: *const c_char) -> ();
+ fn put_client_blocking_read_cxx() -> ();
fn put_client_terminate_cxx() -> ();
}
}
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -1,4 +1,4 @@
-use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
+use crate::constants::{MPSC_CHANNEL_BUFFER_CAPACITY};
use lazy_static::lazy_static;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
@@ -8,16 +8,22 @@
use libc::c_char;
use std::ffi::CStr;
-struct Client {
+struct BidiClient {
tx: Option<mpsc::Sender<String>>,
- handle: Option<JoinHandle<()>>,
+ transmitter_handle: Option<JoinHandle<()>>,
+
+ rx: Option<mpsc::Receiver<String>>,
+ receiver_handle: Option<JoinHandle<()>>,
}
lazy_static! {
- static ref CLIENT: Arc<Mutex<Client>> = Arc::new(Mutex::new(Client {
- tx: None,
- handle: None
- }));
+ static ref CLIENT: Arc<Mutex<BidiClient>> =
+ Arc::new(Mutex::new(BidiClient {
+ tx: None,
+ transmitter_handle: None,
+ rx: None,
+ receiver_handle: None,
+ }));
static ref RUNTIME: Runtime = Runtime::new().unwrap();
}
@@ -25,7 +31,12 @@
if CLIENT.lock().expect("access client").tx.is_none() {
return false;
}
- if CLIENT.lock().expect("access client").handle.is_none() {
+ if CLIENT
+ .lock()
+ .expect("access client")
+ .transmitter_handle
+ .is_none()
+ {
return false;
}
return true;
@@ -34,28 +45,59 @@
pub fn put_client_initialize_cxx() -> () {
println!("[RUST] initializing");
assert!(!is_initialized(), "client cannot be initialized twice");
- let (tx, mut rx): (mpsc::Sender<String>, mpsc::Receiver<String>) =
- mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let handle = RUNTIME.spawn(async move {
- println!("[RUST] [receiver] begin");
- while let Some(data) = rx.recv().await {
- println!("[RUST] [receiver] data: {}", data);
+ // spawn transmitter thread
+ let (transmitter_thread_tx, mut transmitter_thread_rx): (
+ mpsc::Sender<String>,
+ mpsc::Receiver<String>,
+ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let transmitter_handle = RUNTIME.spawn(async move {
+ println!("[RUST] [transmitter_thread] begin");
+ while let Some(data) = transmitter_thread_rx.recv().await {
+ println!("[RUST] [transmitter_thread] data: {}", data);
// todo: send throug grpc here
}
- println!("[RUST] [receiver] done");
+ println!("[RUST] [transmitter_thread] done");
+ });
+ // spawn receiver thread
+ let (receiver_thread_tx, mut receiver_thread_rx): (
+ mpsc::Sender<String>,
+ mpsc::Receiver<String>,
+ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let receiver_handle = RUNTIME.spawn(async move {
+ println!("[RUST] [receiver_thread] begin");
+ // here get responses from grpc and send them with the receiver_thread_tx
+ println!("[RUST] [receiver_thread] done");
});
- CLIENT.lock().expect("access client").handle = Some(handle);
- CLIENT.lock().expect("access client").tx = Some(tx);
+
+ CLIENT.lock().expect("access client").transmitter_handle =
+ Some(transmitter_handle);
+ CLIENT.lock().expect("access client").tx = Some(transmitter_thread_tx);
+ CLIENT.lock().expect("access client").receiver_handle = Some(receiver_handle);
+ CLIENT.lock().expect("access client").rx = Some(receiver_thread_rx);
println!("[RUST] initialized");
}
-pub fn put_client_send_cxx(data: *const c_char) -> () {
+pub fn put_client_blocking_read_cxx() -> () {
+ RUNTIME.block_on(async {
+ let mut rx: mpsc::Receiver<String> = CLIENT
+ .lock()
+ .expect("access client")
+ .rx
+ .take()
+ .expect("access client's receiver");
+ if let Some(data) = rx.recv().await {
+ println!("received data {}", data);
+ }
+ CLIENT.lock().expect("access client").rx = Some(rx);
+ });
+}
+
+pub fn put_client_write_cxx(data: *const c_char) -> () {
println!("[RUST] [put_client_process] begin");
let c_str: &CStr = unsafe { CStr::from_ptr(data) };
let str: String = c_str.to_str().unwrap().to_owned();
println!("[RUST] [put_client_process] data string: {}", str);
- // this works
RUNTIME.block_on(async {
CLIENT
.lock()
@@ -72,13 +114,28 @@
pub fn put_client_terminate_cxx() -> () {
println!("[RUST] put_client_terminating");
- let handle = CLIENT.lock().expect("access client").handle.take().unwrap();
+ let transmitter_handle = CLIENT
+ .lock()
+ .expect("access client")
+ .transmitter_handle
+ .take()
+ .unwrap();
+ let receiver_handle = CLIENT
+ .lock()
+ .expect("access client")
+ .receiver_handle
+ .take()
+ .unwrap();
drop(CLIENT.lock().expect("access client").tx.take().unwrap());
RUNTIME.block_on(async {
- handle.await.unwrap();
+ transmitter_handle.await.unwrap();
+ receiver_handle.await.unwrap();
});
- assert!(!is_initialized(), "client handler released properly");
+ assert!(
+ !is_initialized(),
+ "client transmitter handler released properly"
+ );
println!("[RUST] put_client_terminated");
}

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 26, 5:43 PM (13 h, 51 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2707812
Default Alt Text
D4942.id15919.diff (6 KB)

Event Timeline