diff --git a/services/backup/rust_lib/src/lib.rs b/services/backup/rust_lib/src/lib.rs index e938713d8..ce7fe8c7f 100644 --- a/services/backup/rust_lib/src/lib.rs +++ b/services/backup/rust_lib/src/lib.rs @@ -1,85 +1,94 @@ #[cxx::bridge] mod ffi { extern "Rust" { fn rust_is_initialized_cxx() -> bool; fn rust_initialize_cxx() -> (); unsafe fn rust_process_cxx(data: *const c_char) -> (); fn rust_terminate_cxx() -> (); } } mod constants; use constants::MPSC_CHANNEL_BUFFER_CAPACITY; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task::JoinHandle; use lazy_static::lazy_static; use std::sync::{Arc, Mutex}; use libc; use libc::c_char; use std::ffi::CStr; pub struct Client { tx: Option>, handle: Option>, } lazy_static! { pub static ref CLIENT: Arc> = Arc::new(Mutex::new(Client { tx: None, handle: None })); pub static ref RUNTIME: Runtime = Runtime::new().unwrap(); } pub fn rust_is_initialized_cxx() -> bool { if CLIENT.lock().expect("access client").tx.is_none() { return false; } if CLIENT.lock().expect("access client").handle.is_none() { return false; } return true; } pub fn rust_initialize_cxx() -> () { println!("[RUST] initializing"); assert!(!rust_is_initialized_cxx(), "client cannot be initialized twice"); let (tx, mut rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let handle = RUNTIME.spawn(async move { println!("[RUST] [receiver] begin"); while let Some(data) = rx.recv().await { println!("[RUST] [receiver] data: {}", data); } println!("[RUST] [receiver] done"); }); CLIENT.lock().expect("access client").handle = Some(handle); CLIENT.lock().expect("access client").tx = Some(tx); println!("[RUST] initialized"); } pub fn rust_process_cxx(data: *const c_char) -> () { println!("[RUST] [rust_process] begin"); let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_str: String = data_c_str.to_str().unwrap().to_owned(); println!("[RUST] [rust_process] data string: {}", data_str); RUNTIME.block_on(async { CLIENT .lock() .expect("access client") .tx .as_ref() .expect("access client's transmitter") .send(data_str) .await .expect("send data to receiver"); }); println!("[RUST] [rust_process] end"); } pub fn rust_terminate_cxx() -> () { - unimplemented!(); + println!("[RUST] rust_terminating"); + let handle = CLIENT.lock().expect("access client").handle.take().unwrap(); + + drop(CLIENT.lock().expect("access client").tx.take().unwrap()); + RUNTIME.block_on(async { + handle.await.unwrap(); + }); + + assert!(!rust_is_initialized_cxx(), "client handler released properly"); + println!("[RUST] rust_terminated"); }