Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3365801
D5099.id16617.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Referenced Files
None
Subscribers
None
D5099.id16617.diff
View Options
diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -6,11 +6,8 @@
use proto::GetRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
-use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer};
use anyhow::bail;
use lazy_static::lazy_static;
-use libc;
-use libc::c_char;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
@@ -38,11 +35,10 @@
}
pub fn get_client_initialize_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if is_initialized(&holder)? {
- get_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
+ get_client_terminate_cxx(holder.clone())?;
}
if is_initialized(&holder)? {
@@ -56,11 +52,11 @@
// spawn receiver thread
let (response_thread_tx, response_thread_rx) =
mpsc::channel::<Vec<u8>>(MPSC_CHANNEL_BUFFER_CAPACITY);
- let cloned_holder = holder.clone();
+ let holder_string = holder.to_string();
let rx_handle = RUNTIME.spawn(async move {
let response = grpc_client
.get(GetRequest {
- holder: cloned_holder,
+ holder: holder_string,
})
.await?;
let mut inner_response = response.into_inner();
@@ -88,7 +84,7 @@
rx_handle,
rx: response_thread_rx,
};
- (*clients).insert(holder, client);
+ (*clients).insert(holder.to_string(), client);
return Ok(());
}
bail!("could not access client");
@@ -97,12 +93,11 @@
}
pub fn get_client_blocking_read_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<Vec<u8>, anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
- if let Some(client) = clients.get_mut(&holder) {
+ if let Some(client) = clients.get_mut(&holder.to_string()) {
let maybe_data = client.rx.recv().await;
return Ok(maybe_data.unwrap_or_else(|| vec![]));
} else {
@@ -115,14 +110,13 @@
}
pub fn get_client_terminate_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if !is_initialized(&holder)? {
return Ok(());
}
if let Ok(mut clients) = CLIENTS.lock() {
- match clients.remove(&holder) {
+ match clients.remove(&holder.to_string()) {
Some(client) => {
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -1,7 +1,6 @@
mod constants;
mod get_client;
mod put_client;
-mod tools;
use put_client::{
put_client_blocking_read_cxx, put_client_initialize_cxx,
@@ -15,28 +14,28 @@
#[cxx::bridge]
mod ffi {
extern "Rust" {
- unsafe fn put_client_initialize_cxx(
- holder_char: *const c_char,
+ fn put_client_initialize_cxx(
+ holder_char: &str,
) -> Result<()>;
unsafe fn put_client_write_cxx(
- holder_char: *const c_char,
+ holder_char: &str,
field_index: usize,
data: *const c_char,
) -> Result<()>;
- unsafe fn put_client_blocking_read_cxx(
- holder_char: *const c_char,
+ fn put_client_blocking_read_cxx(
+ holder_char: &str,
) -> Result<String>;
- unsafe fn put_client_terminate_cxx(
- holder_char: *const c_char,
+ fn put_client_terminate_cxx(
+ holder_char: &str,
) -> Result<()>;
- unsafe fn get_client_initialize_cxx(
- holder_char: *const c_char,
+ fn get_client_initialize_cxx(
+ holder_char: &str,
) -> Result<()>;
- unsafe fn get_client_blocking_read_cxx(
- holder_char: *const c_char,
+ fn get_client_blocking_read_cxx(
+ holder_char: &str,
) -> Result<Vec<u8>>;
- unsafe fn get_client_terminate_cxx(
- holder_char: *const c_char,
+ fn get_client_terminate_cxx(
+ holder_char: &str,
) -> Result<()>;
}
}
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -8,7 +8,6 @@
use proto::PutRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
-use crate::tools::{c_char_pointer_to_string, string_to_c_char_pointer};
use anyhow::bail;
use lazy_static::lazy_static;
use libc;
@@ -51,11 +50,10 @@
}
pub fn put_client_initialize_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if is_initialized(&holder)? {
- put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
+ put_client_terminate_cxx(&holder.to_string())?;
}
if is_initialized(&holder)? {
bail!("client cannot be initialized twice");
@@ -160,7 +158,7 @@
rx: response_thread_rx,
rx_handle,
};
- (*clients).insert(holder, client);
+ (*clients).insert(holder.to_string(), client);
return Ok(());
}
bail!(format!("could not access client for holder {}", holder));
@@ -169,12 +167,11 @@
}
pub fn put_client_blocking_read_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<String, anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
Ok(RUNTIME.block_on(async {
if let Ok(mut clients) = CLIENTS.lock() {
- let maybe_client = clients.get_mut(&holder);
+ let maybe_client = clients.get_mut(holder);
if let Some(client) = maybe_client {
if let Some(data) = client.rx.recv().await {
return Ok(data);
@@ -200,17 +197,16 @@
* 3 - data chunk (bytes)
*/
pub fn put_client_write_cxx(
- holder_char: *const c_char,
+ holder: &str,
field_index: usize,
data: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
RUNTIME.block_on(async {
if let Ok(clients) = CLIENTS.lock() {
- let maybe_client = clients.get(&holder);
+ let maybe_client = clients.get(&holder.to_string());
if let Some(client) = maybe_client {
client
.tx
@@ -230,15 +226,14 @@
}
pub fn put_client_terminate_cxx(
- holder_char: *const c_char,
+ holder: &str,
) -> anyhow::Result<(), anyhow::Error> {
- let holder = c_char_pointer_to_string(holder_char)?;
if !is_initialized(&holder)? {
return Ok(());
}
if let Ok(mut clients) = CLIENTS.lock() {
- let maybe_client = clients.remove(&holder);
+ let maybe_client = clients.remove(&holder.to_string());
if let Some(client) = maybe_client {
drop(client.tx);
RUNTIME.block_on(async { client.rx_handle.await? })?;
diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
--- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
+++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
@@ -74,29 +74,26 @@
// put into S3
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- put_client_initialize_cxx(holder.c_str());
+ put_client_initialize_cxx(rust::String(holder));
put_client_write_cxx(
- holder.c_str(),
+ rust::String(holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
holder.c_str());
- put_client_blocking_read_cxx(
- holder.c_str());
+ put_client_blocking_read_cxx(rust::String(holder));
put_client_write_cxx(
- holder.c_str(),
+ rust::String(holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
newLogItem->getDataHash().c_str());
- rust::String responseStr = put_client_blocking_read_cxx(
- holder.c_str());
+ rust::String responseStr = put_client_blocking_read_cxx(rust::String(holder));
// data exists?
if (!(bool)tools::charPtrToInt(responseStr.c_str())) {
put_client_write_cxx(
- holder.c_str(),
+ rust::String(holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(data).c_str());
- put_client_blocking_read_cxx(
- holder.c_str());
+ put_client_blocking_read_cxx(rust::String(holder));
}
- put_client_terminate_cxx(holder.c_str());
+ put_client_terminate_cxx(rust::String(holder));
return newLogItem;
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -66,19 +66,19 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- put_client_initialize_cxx(this->holder.c_str());
+ put_client_initialize_cxx(rust::String(this->holder));
put_client_write_cxx(
- this->holder.c_str(),
+ rust::String(this->holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->holder.c_str());
- put_client_blocking_read_cxx(this->holder.c_str());
+ put_client_blocking_read_cxx(rust::String(this->holder));
put_client_write_cxx(
- this->holder.c_str(),
+ rust::String(this->holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->dataHash.c_str());
rust::String responseStr =
- put_client_blocking_read_cxx(this->holder.c_str());
+ put_client_blocking_read_cxx(rust::String(this->holder));
// data exists?
if ((bool)tools::charPtrToInt(responseStr.c_str())) {
return std::make_unique<ServerBidiReactorStatus>(
@@ -91,11 +91,11 @@
return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK);
}
put_client_write_cxx(
- this->holder.c_str(),
+ rust::String(this->holder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::string(std::move(*request.mutable_newcompactionchunk()))
.c_str());
- put_client_blocking_read_cxx(this->holder.c_str());
+ put_client_blocking_read_cxx(rust::String(this->holder));
return nullptr;
}
@@ -105,7 +105,7 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- put_client_terminate_cxx(this->holder.c_str());
+ put_client_terminate_cxx(rust::String(this->holder));
// TODO add recovery data
// TODO handle attachments holders
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -19,7 +19,7 @@
throw std::runtime_error(
"get reactor cannot be initialized when backup item is missing");
}
- get_client_initialize_cxx(holder.c_str());
+ get_client_initialize_cxx(rust::String(holder));
this->clientInitialized = true;
}
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -43,7 +43,7 @@
throw std::runtime_error(
"put reactor cannot be initialized with empty hash");
}
- put_client_initialize_cxx(this->blobHolder.c_str());
+ put_client_initialize_cxx(rust::String(this->blobHolder));
}
std::unique_ptr<grpc::Status>
@@ -98,7 +98,7 @@
}
if (this->persistenceMethod == PersistenceMethod::BLOB) {
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
request.mutable_logdata()->c_str());
put_client_blocking_read_cxx(this->blobHolder.c_str());
@@ -115,12 +115,12 @@
tools::generateHolder(this->hash, this->backupID, this->logID);
this->initializePutClient();
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->blobHolder.c_str());
put_client_blocking_read_cxx(this->blobHolder.c_str());
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->hash.c_str());
rust::String responseStr =
@@ -130,7 +130,7 @@
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
put_client_write_cxx(
- this->blobHolder.c_str(),
+ rust::String(this->blobHolder),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(this->value).c_str());
put_client_blocking_read_cxx(this->blobHolder.c_str());
@@ -146,7 +146,7 @@
void SendLogReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- put_client_terminate_cxx(this->blobHolder.c_str());
+ put_client_terminate_cxx(rust::String(this->blobHolder));
if (!this->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 26, 7:48 AM (21 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2583929
Default Alt Text
D5099.id16617.diff (13 KB)
Attached To
Mode
D5099: [services] Backup - Blob Get Client - Pass rust string instead of const char*
Attached
Detach File
Event Timeline
Log In to Comment