diff --git a/services/backup/rust_lib/Cargo.lock b/services/backup/rust_lib/Cargo.lock --- a/services/backup/rust_lib/Cargo.lock +++ b/services/backup/rust_lib/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + [[package]] name = "cc" version = "1.0.73" @@ -62,6 +68,33 @@ "syn", ] +[[package]] +name = "futures-core" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" + [[package]] name = "link-cplusplus" version = "1.0.6" @@ -71,12 +104,28 @@ "cc", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "proc-macro2" version = "1.0.43" @@ -101,6 +150,10 @@ dependencies = [ "cxx", "cxx-build", + "lazy_static", + "libc", + "tokio", + "tokio-stream", ] [[package]] @@ -129,6 +182,41 @@ "winapi-util", ] +[[package]] +name = "tokio" +version = "1.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" +dependencies = [ + "autocfg", + "num_cpus", + "once_cell", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.3" diff --git a/services/backup/rust_lib/Cargo.toml b/services/backup/rust_lib/Cargo.toml --- a/services/backup/rust_lib/Cargo.toml +++ b/services/backup/rust_lib/Cargo.toml @@ -5,6 +5,10 @@ [dependencies] cxx = "1.0" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1.9" +lazy_static = "1.4.0" +libc = "0.2.132" [build-dependencies] cxx-build = "1.0" diff --git a/services/backup/rust_lib/src/constants.rs b/services/backup/rust_lib/src/constants.rs new file mode 100644 --- /dev/null +++ b/services/backup/rust_lib/src/constants.rs @@ -0,0 +1 @@ +pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; diff --git a/services/backup/rust_lib/src/lib.rs b/services/backup/rust_lib/src/lib.rs --- a/services/backup/rust_lib/src/lib.rs +++ b/services/backup/rust_lib/src/lib.rs @@ -1,10 +1,98 @@ #[cxx::bridge] mod ffi { extern "Rust" { - fn test_function() -> i32; + fn rust_initialize_cxx() -> (); + unsafe fn rust_process_cxx(data: *const c_char) -> (); + fn rust_terminate_cxx() -> (); } } -pub fn test_function() -> i32 { - 0 +mod constants; + +use constants::MPSC_CHANNEL_BUFFER_CAPACITY; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use lazy_static::lazy_static; +use std::sync::{Arc, Mutex}; +extern crate libc; +use libc::c_char; +use std::ffi::CStr; + +pub struct Client { + tx: Option>, + handle: Option>, +} + +lazy_static! { + pub static ref CLIENT: Arc> = Arc::new(Mutex::new(Client { + tx: None, + handle: None + })); + pub static ref RUNTIME: Runtime = Runtime::new().unwrap(); +} + +pub fn rust_initialize_cxx() -> () { + println!("[RUST] initializing"); + assert!( + CLIENT.lock().expect("access client").tx.is_none(), + "client's transmitter cannot be initialized twice" + ); + assert!( + CLIENT.lock().expect("access client").handle.is_none(), + "runtime cannot be initialized twice" + ); + let (tx, mut rx): (mpsc::Sender, mpsc::Receiver) = + mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let handle = RUNTIME.spawn(async move { + println!("[RUST] [receiver] begin"); + while let Some(data) = rx.recv().await { + println!("[RUST] [receiver] data: {}", data); + } + println!("[RUST] [receiver] done"); + }); + CLIENT.lock().expect("access client").handle = Some(handle); + CLIENT.lock().expect("access client").tx = Some(tx); + println!("[RUST] initialized"); +} + +pub fn rust_process_cxx(data: *const c_char) -> () { + println!("[RUST] [rust_process] begin"); + let c_str: &CStr = unsafe { CStr::from_ptr(data) }; + let str: String = c_str.to_str().unwrap().to_owned(); + println!("[RUST] [rust_process] data string: {}", str); + + // this works + RUNTIME.block_on(async { + CLIENT + .lock() + .expect("access client") + .tx + .as_ref() + .expect("access client's transmitter") + .send(str) + .await + .expect("send data to receiver"); + }); + println!("[RUST] [rust_process] end"); +} + +pub fn rust_terminate_cxx() -> () { + println!("[RUST] rust_terminating"); + let handle = CLIENT.lock().expect("access client").handle.take().unwrap(); + + drop(CLIENT.lock().expect("access client").tx.take().unwrap()); + RUNTIME.block_on(async { + handle.await.unwrap(); + }); + + assert!( + CLIENT.lock().expect("access client").tx.is_none(), + "client's transmitter released properly" + ); + assert!( + CLIENT.lock().expect("access client").handle.is_none(), + "client's handle released properly" + ); + println!("[RUST] rust_terminated"); }