Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3361804
D5036.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Referenced Files
None
Subscribers
None
D5036.diff
View Options
diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
--- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
+++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp
@@ -74,23 +74,26 @@
// put into S3
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
- put_client_initialize_cxx();
+ put_client_initialize_cxx(holder.c_str());
put_client_write_cxx(
+ holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
holder.c_str());
- put_client_blocking_read_cxx();
+ put_client_blocking_read_cxx(holder.c_str());
put_client_write_cxx(
+ holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
newLogItem->getDataHash().c_str());
- rust::String responseStr = put_client_blocking_read_cxx();
+ rust::String responseStr = put_client_blocking_read_cxx(holder.c_str());
// data exists?
if (!(bool)tools::charPtrToInt(responseStr.c_str())) {
put_client_write_cxx(
+ holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(data).c_str());
- put_client_blocking_read_cxx();
+ put_client_blocking_read_cxx(holder.c_str());
}
- put_client_terminate_cxx();
+ put_client_terminate_cxx(holder.c_str());
return newLogItem;
}
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -66,26 +66,29 @@
}
response->set_backupid(this->backupID);
this->holder = tools::generateHolder(this->dataHash, this->backupID);
- put_client_initialize_cxx();
+ put_client_initialize_cxx(this->holder.c_str());
put_client_write_cxx(
+ this->holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->holder.c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(
+ this->holder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
put_client_write_cxx(
+ this->holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->dataHash.c_str());
- rust::String responseStr =
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably
- // want to delegate performing ops
- // to separate threads in the base
- // reactors
+ rust::String responseStr = put_client_blocking_read_cxx(
+ this->holder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably
+ // want to delegate performing ops
+ // to separate threads in the base
+ // reactors
// data exists?
if ((bool)tools::charPtrToInt(responseStr.c_str())) {
return std::make_unique<ServerBidiReactorStatus>(
@@ -97,21 +100,18 @@
if (request.mutable_newcompactionchunk()->empty()) {
return std::make_unique<ServerBidiReactorStatus>(grpc::Status::OK);
}
- try {
- put_client_write_cxx(
- tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
- std::string(std::move(*request.mutable_newcompactionchunk()))
- .c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
- } catch (std::exception &e) {
- throw std::runtime_error(
- e.what()); // todo in base reactors we can just handle std exception
- // instead of keep rethrowing here
- }
+ put_client_write_cxx(
+ this->holder.c_str(),
+ tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
+ std::string(std::move(*request.mutable_newcompactionchunk()))
+ .c_str());
+ put_client_blocking_read_cxx(
+ this->holder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+
return nullptr;
}
}
@@ -120,7 +120,7 @@
void CreateNewBackupReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- put_client_terminate_cxx();
+ put_client_terminate_cxx(this->holder.c_str());
// TODO add recovery data
// TODO handle attachments holders
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -44,7 +44,7 @@
throw std::runtime_error(
"put reactor cannot be initialized with empty hash");
}
- put_client_initialize_cxx();
+ put_client_initialize_cxx(this->blobHolder.c_str());
}
std::unique_ptr<grpc::Status>
@@ -99,13 +99,15 @@
}
if (this->persistenceMethod == PersistenceMethod::BLOB) {
put_client_write_cxx(
+ this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
request.mutable_logdata()->c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(
+ this->blobHolder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
return nullptr;
}
@@ -119,35 +121,40 @@
tools::generateHolder(this->hash, this->backupID, this->logID);
this->initializePutClient();
put_client_write_cxx(
+ this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->blobHolder.c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(
+ this->blobHolder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
put_client_write_cxx(
+ this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->hash.c_str());
- rust::String responseStr =
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably
- // want to delegate performing ops
- // to separate threads in the base
- // reactors
+ rust::String responseStr = put_client_blocking_read_cxx(
+ this->blobHolder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably
+ // want to delegate performing ops
+ // to separate threads in the base
+ // reactors
// data exists?
if ((bool)tools::charPtrToInt(responseStr.c_str())) {
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
put_client_write_cxx(
+ this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(this->value).c_str());
- put_client_blocking_read_cxx(); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(
+ this->blobHolder.c_str()); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
this->value = "";
} else {
this->persistenceMethod = PersistenceMethod::DB;
@@ -160,7 +167,7 @@
void SendLogReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- put_client_terminate_cxx();
+ put_client_terminate_cxx(this->blobHolder.c_str());
if (!this->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Nov 25, 7:30 PM (22 h, 4 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2580798
Default Alt Text
D5036.diff (11 KB)
Attached To
Mode
D5036: [services] Rust Integration - Backup - c++ - Multiple put clients
Attached
Detach File
Event Timeline
Log In to Comment