diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -6,11 +6,8 @@ use proto::GetRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer}; use anyhow::bail; use lazy_static::lazy_static; -use libc; -use libc::c_char; use std::collections::HashMap; use std::sync::Mutex; use tokio::runtime::Runtime; @@ -38,11 +35,10 @@ } pub fn get_client_initialize_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if is_initialized(&holder)? { - get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; + get_client_terminate_cxx(holder.clone())?; } if is_initialized(&holder)? { @@ -56,11 +52,11 @@ // spawn receiver thread let (response_thread_tx, response_thread_rx) = mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); - let cloned_holder = holder.clone(); + let holder_string = holder.to_string(); let rx_handle = RUNTIME.spawn(async move { let response = grpc_client .get(GetRequest { - holder: cloned_holder, + holder: holder_string, }) .await?; let mut inner_response = response.into_inner(); @@ -88,7 +84,7 @@ rx_handle, rx: response_thread_rx, }; - (*clients).insert(holder, client); + (*clients).insert(holder.to_string(), client); return Ok(()); } bail!("could not access client"); @@ -97,12 +93,11 @@ } pub fn get_client_blocking_read_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result, anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { - if let Some(client) = clients.get_mut(&holder) { + if let Some(client) = clients.get_mut(&holder.to_string()) { let maybe_data = client.rx.recv().await; return Ok(maybe_data.unwrap_or_else(|| vec![])); } else { @@ -115,14 +110,13 @@ } pub fn get_client_terminate_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if !is_initialized(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { - match clients.remove(&holder) { + match clients.remove(&holder.to_string()) { Some(client) => { RUNTIME.block_on(async { if client.rx_handle.await.is_err() { diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs --- a/services/backup/blob_client/src/lib.rs +++ b/services/backup/blob_client/src/lib.rs @@ -1,7 +1,6 @@ mod constants; mod get_client; mod put_client; -mod tools; use put_client::{ put_client_blocking_read_cxx, put_client_initialize_cxx, @@ -15,28 +14,28 @@ #[cxx::bridge] mod ffi { extern "Rust" { - unsafe fn put_client_initialize_cxx( - holder_char: *const c_char, + fn put_client_initialize_cxx( + holder_char: &str, ) -> Result<()>; unsafe fn put_client_write_cxx( - holder_char: *const c_char, + holder_char: &str, field_index: usize, data: *const c_char, ) -> Result<()>; - unsafe fn put_client_blocking_read_cxx( - holder_char: *const c_char, + fn put_client_blocking_read_cxx( + holder_char: &str, ) -> Result; - unsafe fn put_client_terminate_cxx( - holder_char: *const c_char, + fn put_client_terminate_cxx( + holder_char: &str, ) -> Result<()>; - unsafe fn get_client_initialize_cxx( - holder_char: *const c_char, + fn get_client_initialize_cxx( + holder_char: &str, ) -> Result<()>; - unsafe fn get_client_blocking_read_cxx( - holder_char: *const c_char, + fn get_client_blocking_read_cxx( + holder_char: &str, ) -> Result>; - unsafe fn get_client_terminate_cxx( - holder_char: *const c_char, + fn get_client_terminate_cxx( + holder_char: &str, ) -> Result<()>; } } diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs --- a/services/backup/blob_client/src/put_client.rs +++ b/services/backup/blob_client/src/put_client.rs @@ -8,7 +8,6 @@ use proto::PutRequest; use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY}; -use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer}; use anyhow::bail; use lazy_static::lazy_static; use libc; @@ -51,11 +50,10 @@ } pub fn put_client_initialize_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if is_initialized(&holder)? { - put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?; + put_client_terminate_cxx(&holder.to_string())?; } if is_initialized(&holder)? { bail!("client cannot be initialized twice"); @@ -160,7 +158,7 @@ rx: response_thread_rx, rx_handle, }; - (*clients).insert(holder, client); + (*clients).insert(holder.to_string(), client); return Ok(()); } bail!(format!("could not access client for holder {}", holder)); @@ -169,12 +167,11 @@ } pub fn put_client_blocking_read_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result { - let holder = c_char_pointer_to_string(holder_char)?; Ok(RUNTIME.block_on(async { if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.get_mut(&holder); + let maybe_client = clients.get_mut(holder); if let Some(client) = maybe_client { if let Some(data) = client.rx.recv().await { return Ok(data); @@ -200,17 +197,16 @@ * 3 - data chunk (bytes) */ pub fn put_client_write_cxx( - holder_char: *const c_char, + holder: &str, field_index: usize, data: *const c_char, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; let data_c_str: &CStr = unsafe { CStr::from_ptr(data) }; let data_bytes: Vec = data_c_str.to_bytes().to_vec(); RUNTIME.block_on(async { if let Ok(clients) = CLIENTS.lock() { - let maybe_client = clients.get(&holder); + let maybe_client = clients.get(&holder.to_string()); if let Some(client) = maybe_client { client .tx @@ -230,15 +226,14 @@ } pub fn put_client_terminate_cxx( - holder_char: *const c_char, + holder: &str, ) -> anyhow::Result<(), anyhow::Error> { - let holder = c_char_pointer_to_string(holder_char)?; if !is_initialized(&holder)? { return Ok(()); } if let Ok(mut clients) = CLIENTS.lock() { - let maybe_client = clients.remove(&holder); + let maybe_client = clients.remove(&holder.to_string()); if let Some(client) = maybe_client { drop(client.tx); RUNTIME.block_on(async { client.rx_handle.await? })?; diff --git a/services/backup/blob_client/src/tools.rs b/services/backup/blob_client/src/tools.rs deleted file mode 100644 --- a/services/backup/blob_client/src/tools.rs +++ /dev/null @@ -1,15 +0,0 @@ -use libc::c_char; -use std::ffi::{CStr, CString}; - -pub fn c_char_pointer_to_string( - c_char_pointer: *const c_char, -) -> anyhow::Result { - let holder_cstr: &CStr = unsafe { CStr::from_ptr(c_char_pointer) }; - Ok(holder_cstr.to_str()?.to_owned()) -} - -pub fn string_to_c_char_pointer( - signs: &String, -) -> anyhow::Result<*const c_char, anyhow::Error> { - Ok(CString::new((&signs).as_bytes())?.as_ptr()) -} diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -74,29 +74,26 @@ // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - put_client_initialize_cxx(holder.c_str()); + put_client_initialize_cxx(rust::String(holder)); put_client_write_cxx( - holder.c_str(), + rust::String(holder), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), holder.c_str()); - put_client_blocking_read_cxx( - holder.c_str()); + put_client_blocking_read_cxx(rust::String(holder)); put_client_write_cxx( - holder.c_str(), + rust::String(holder), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), newLogItem->getDataHash().c_str()); - rust::String responseStr = put_client_blocking_read_cxx( - holder.c_str()); + rust::String responseStr = put_client_blocking_read_cxx(rust::String(holder)); // data exists? if (!(bool)tools::charPtrToInt(responseStr.c_str())) { put_client_write_cxx( - holder.c_str(), + rust::String(holder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(data).c_str()); - put_client_blocking_read_cxx( - holder.c_str()); + put_client_blocking_read_cxx(rust::String(holder)); } - put_client_terminate_cxx(holder.c_str()); + put_client_terminate_cxx(rust::String(holder)); return newLogItem; } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -66,19 +66,19 @@ } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - put_client_initialize_cxx(this->holder.c_str()); + put_client_initialize_cxx(rust::String(this->holder)); put_client_write_cxx( - this->holder.c_str(), + rust::String(this->holder), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->holder.c_str()); - put_client_blocking_read_cxx(this->holder.c_str()); + put_client_blocking_read_cxx(rust::String(this->holder)); put_client_write_cxx( - this->holder.c_str(), + rust::String(this->holder), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->dataHash.c_str()); rust::String responseStr = - put_client_blocking_read_cxx(this->holder.c_str()); + put_client_blocking_read_cxx(rust::String(this->holder)); // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( @@ -91,11 +91,11 @@ return std::make_unique(grpc::Status::OK); } put_client_write_cxx( - this->holder.c_str(), + rust::String(this->holder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::string(std::move(*request.mutable_newcompactionchunk())) .c_str()); - put_client_blocking_read_cxx(this->holder.c_str()); + put_client_blocking_read_cxx(rust::String(this->holder)); return nullptr; } @@ -105,7 +105,7 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(this->holder.c_str()); + put_client_terminate_cxx(rust::String(this->holder)); // TODO add recovery data // TODO handle attachments holders diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -19,7 +19,7 @@ throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - get_client_initialize_cxx(holder.c_str()); + get_client_initialize_cxx(rust::String(holder)); this->clientInitialized = true; } diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -43,7 +43,7 @@ throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - put_client_initialize_cxx(this->blobHolder.c_str()); + put_client_initialize_cxx(rust::String(this->blobHolder)); } std::unique_ptr @@ -98,7 +98,7 @@ } if (this->persistenceMethod == PersistenceMethod::BLOB) { put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), request.mutable_logdata()->c_str()); put_client_blocking_read_cxx(this->blobHolder.c_str()); @@ -115,12 +115,12 @@ tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutClient(); put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->blobHolder.c_str()); put_client_blocking_read_cxx(this->blobHolder.c_str()); put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->hash.c_str()); rust::String responseStr = @@ -130,7 +130,7 @@ return std::make_unique(grpc::Status::OK); } put_client_write_cxx( - this->blobHolder.c_str(), + rust::String(this->blobHolder), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(this->value).c_str()); put_client_blocking_read_cxx(this->blobHolder.c_str()); @@ -146,7 +146,7 @@ void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(this->blobHolder.c_str()); + put_client_terminate_cxx(rust::String(this->blobHolder)); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error(