Page MenuHomePhabricator

D5099.id16617.diff
No OneTemporary

D5099.id16617.diff

diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -6,11 +6,8 @@
use proto::GetRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
-use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer};
use anyhow::bail;
use lazy_static::lazy_static;
-use libc;
-use libc::c_char;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
@@ -38,11 +35,10 @@
}
pub fn get_client_initialize_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if is_initialized(&holder)? {
- get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
+ get_client_terminate_cxx(holder.clone())?;
}
if is_initialized(&holder)? {
@@ -56,11 +52,11 @@
// spawn receiver thread
let (response_thread_tx, response_thread_rx) =
mpsc::channel::<Vec<u8>>(MPSC_CHANNEL_BUFFER_CAPACITY);
- let cloned_holder = holder.clone();
+ let holder_string = holder.to_string();
let rx_handle = RUNTIME.spawn(async move {
let response = grpc_client
.get(GetRequest {
- holder: cloned_holder,
+ holder: holder_string,
})
.await?;
let mut inner_response = response.into_inner();
@@ -88,7 +84,7 @@
rx_handle,
rx: response_thread_rx,
};
- (*clients).insert(holder, client);
+ (*clients).insert(holder.to_string(), client);
return Ok(());
}
bail!("could not access client");
@@ -97,12 +93,11 @@
}
pub fn get_client_blocking_read_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<Vec<u8>, anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
- if let Some(client) = clients.get_mut(&holder) {
+ if let Some(client) = clients.get_mut(&holder.to_string()) {
let maybe_data = client.rx.recv().await;
return Ok(maybe_data.unwrap_or_else(|| vec![]));
} else {
@@ -115,14 +110,13 @@
}
pub fn get_client_terminate_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if !is_initialized(&holder)? {
return Ok(());
}
if let Ok(mut clients) = CLIENTS.lock() {
- match clients.remove(&holder) {
+ match clients.remove(&holder.to_string()) {
Some(client) => {
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -1,7 +1,6 @@
mod constants;
mod get_client;
mod put_client;
-mod tools;
use put_client::{
put_client_blocking_read_cxx, put_client_initialize_cxx,
@@ -15,28 +14,28 @@
#[cxx::bridge]
mod ffi {
extern "Rust" {
- unsafe fn put_client_initialize_cxx(
- holder_char: *const c_char,
+ fn put_client_initialize_cxx(
+ holder_char: &str,
) -> Result<()>;
unsafe fn put_client_write_cxx(
- holder_char: *const c_char,
+ holder_char: &str,
field_index: usize,
data: *const c_char,
) -> Result<()>;
- unsafe fn put_client_blocking_read_cxx(
- holder_char: *const c_char,
+ fn put_client_blocking_read_cxx(
+ holder_char: &str,
) -> Result<String>;
- unsafe fn put_client_terminate_cxx(
- holder_char: *const c_char,
+ fn put_client_terminate_cxx(
+ holder_char: &str,
) -> Result<()>;
- unsafe fn get_client_initialize_cxx(
- holder_char: *const c_char,
+ fn get_client_initialize_cxx(
+ holder_char: &str,
) -> Result<()>;
- unsafe fn get_client_blocking_read_cxx(
- holder_char: *const c_char,
+ fn get_client_blocking_read_cxx(
+ holder_char: &str,
) -> Result<Vec<u8>>;
- unsafe fn get_client_terminate_cxx(
- holder_char: *const c_char,
+ fn get_client_terminate_cxx(
+ holder_char: &str,
) -> Result<()>;
}
}
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -8,7 +8,6 @@
use proto::PutRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
-use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer};
use anyhow::bail;
use lazy_static::lazy_static;
use libc;
@@ -51,11 +50,10 @@
}
pub fn put_client_initialize_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if is_initialized(&holder)? {
- put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
+ put_client_terminate_cxx(&holder.to_string())?;
}
if is_initialized(&holder)? {
bail!("client cannot be initialized twice");
@@ -160,7 +158,7 @@
rx: response_thread_rx,
rx_handle,
};
- (*clients).insert(holder, client);
+ (*clients).insert(holder.to_string(), client);
return Ok(());
}
bail!(format!("could not access client for holder {}", holder));
@@ -169,12 +167,11 @@
}
pub fn put_client_blocking_read_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<String, anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
- let maybe_client = clients.get_mut(&holder);
+ let maybe_client = clients.get_mut(holder);
if let Some(client) = maybe_client {
if let Some(data) = client.rx.recv().await {
return Ok(data);
@@ -200,17 +197,16 @@
* 3 - data chunk (bytes)
*/
pub fn put_client_write_cxx(
- holder_char: *const c_char,
+ holder: &str,
field_index: usize,
data: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
RUNTIME.block_on(async {
if let Ok(clients) = CLIENTS.lock() {
- let maybe_client = clients.get(&holder);
+ let maybe_client = clients.get(&holder.to_string());
if let Some(client) = maybe_client {
client
.tx
@@ -230,15 +226,14 @@
}
pub fn put_client_terminate_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if !is_initialized(&holder)? {
return Ok(());
}
if let Ok(mut clients) = CLIENTS.lock() {
- let maybe_client = clients.remove(&holder);
+ let maybe_client = clients.remove(&holder.to_string());
if let Some(client) = maybe_client {
drop(client.tx);
RUNTIME.block_on(async { client.rx_handle.await? })?;
diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
--- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
+++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
@@ -74,29 +74,26 @@
// put into S3
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- put_client_initialize_cxx(holder.c_str());
+ put_client_initialize_cxx(rust::String(holder));
put_client_write_cxx(
- holder.c_str(),
+ rust::String(holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
holder.c_str());
- put_client_blocking_read_cxx(
- holder.c_str());
+ put_client_blocking_read_cxx(rust::String(holder));
put_client_write_cxx(
- holder.c_str(),
+ rust::String(holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
newLogItem->getDataHash().c_str());
- rust::String responseStr = put_client_blocking_read_cxx(
- holder.c_str());
+ rust::String responseStr = put_client_blocking_read_cxx(rust::String(holder));
// data exists?
if (!(bool)tools::charPtrToInt(responseStr.c_str())) {
put_client_write_cxx(
- holder.c_str(),
+ rust::String(holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(data).c_str());
- put_client_blocking_read_cxx(
- holder.c_str());
+ put_client_blocking_read_cxx(rust::String(holder));
}
- put_client_terminate_cxx(holder.c_str());
+ put_client_terminate_cxx(rust::String(holder));
return newLogItem;
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -66,19 +66,19 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- put_client_initialize_cxx(this->holder.c_str());
+ put_client_initialize_cxx(rust::String(this->holder));
put_client_write_cxx(
- this->holder.c_str(),
+ rust::String(this->holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->holder.c_str());
- put_client_blocking_read_cxx(this->holder.c_str());
+ put_client_blocking_read_cxx(rust::String(this->holder));
put_client_write_cxx(
- this->holder.c_str(),
+ rust::String(this->holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->dataHash.c_str());
rust::String responseStr =
- put_client_blocking_read_cxx(this->holder.c_str());
+ put_client_blocking_read_cxx(rust::String(this->holder));
// data exists?
if ((bool)tools::charPtrToInt(responseStr.c_str())) {
return std::make_unique<ServerBidiReactorStatus>(
@@ -91,11 +91,11 @@
return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK);
}
put_client_write_cxx(
- this->holder.c_str(),
+ rust::String(this->holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::string(std::move(*request.mutable_newcompactionchunk()))
.c_str());
- put_client_blocking_read_cxx(this->holder.c_str());
+ put_client_blocking_read_cxx(rust::String(this->holder));
return nullptr;
}
@@ -105,7 +105,7 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- put_client_terminate_cxx(this->holder.c_str());
+ put_client_terminate_cxx(rust::String(this->holder));
// TODO add recovery data
// TODO handle attachments holders
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -19,7 +19,7 @@
throw std::runtime_error(
"get reactor cannot be initialized when backup item is missing");
}
- get_client_initialize_cxx(holder.c_str());
+ get_client_initialize_cxx(rust::String(holder));
this->clientInitialized = true;
}
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -43,7 +43,7 @@
throw std::runtime_error(
"put reactor cannot be initialized with empty hash");
}
- put_client_initialize_cxx(this->blobHolder.c_str());
+ put_client_initialize_cxx(rust::String(this->blobHolder));
}
std::unique_ptr<grpc::Status>
@@ -98,7 +98,7 @@
}
if (this->persistenceMethod == PersistenceMethod::BLOB) {
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
request.mutable_logdata()->c_str());
put_client_blocking_read_cxx(this->blobHolder.c_str());
@@ -115,12 +115,12 @@
tools::generateHolder(this->hash, this->backupID, this->logID);
this->initializePutClient();
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->blobHolder.c_str());
put_client_blocking_read_cxx(this->blobHolder.c_str());
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->hash.c_str());
rust::String responseStr =
@@ -130,7 +130,7 @@
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(this->value).c_str());
put_client_blocking_read_cxx(this->blobHolder.c_str());
@@ -146,7 +146,7 @@
void SendLogReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- put_client_terminate_cxx(this->blobHolder.c_str());
+ put_client_terminate_cxx(rust::String(this->blobHolder));
if (!this->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 7:48 AM (21 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2583929
Default Alt Text
D5099.id16617.diff (13 KB)

Event Timeline