Page MenuHomePhabricator

D5019.diff
No OneTemporary

D5019.diff

diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.h
@@ -44,7 +44,7 @@
void storeInDatabase();
std::string generateLogID(const std::string &backupID);
- void initializePutReactor();
+ void initializePutClient();
public:
using ServerReadReactorBase<backup::SendLogRequest, backup::SendLogResponse>::
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -1,5 +1,7 @@
#include "SendLogReactor.h"
+#include "blob_client/src/lib.rs.h"
+
#include "Constants.h"
#include "DatabaseManager.h"
#include "GlobalTools.h"
@@ -33,7 +35,7 @@
std::to_string(tools::getCurrentTimestamp());
}
-void SendLogReactor::initializePutReactor() {
+void SendLogReactor::initializePutClient() {
if (this->blobHolder.empty()) {
throw std::runtime_error(
"put reactor cannot be initialized with empty blob holder");
@@ -42,7 +44,7 @@
throw std::runtime_error(
"put reactor cannot be initialized with empty hash");
}
- // todo:blob perform put:initialize
+ put_client_initialize_cxx();
}
std::unique_ptr<grpc::Status>
@@ -86,9 +88,7 @@
if (!request.has_logdata()) {
throw std::runtime_error("log data expected but not received");
}
- std::unique_ptr<std::string> chunk =
- std::make_unique<std::string>(std::move(*request.mutable_logdata()));
- if (chunk->size() == 0) {
+ if (request.mutable_logdata()->size() == 0) {
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->persistenceMethod == PersistenceMethod::DB) {
@@ -98,10 +98,18 @@
"), merge them into bigger parts instead");
}
if (this->persistenceMethod == PersistenceMethod::BLOB) {
- // todo:blob perform put:add chunk (std::move(chunk))
+ put_client_write_cxx(
+ tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
+ request.mutable_logdata()->c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+
return nullptr;
}
- this->value += std::move(*chunk);
+ this->value += std::move(*request.mutable_logdata());
database::LogItem logItem = database::LogItem(
this->backupID, this->logID, true, this->value, "", this->hash);
if (database::LogItem::getItemSize(&logItem) >
@@ -109,8 +117,37 @@
this->persistenceMethod = PersistenceMethod::BLOB;
this->blobHolder =
tools::generateHolder(this->hash, this->backupID, this->logID);
- this->initializePutReactor();
- // todo:blob perform put:add chunk (this->value)
+ this->initializePutClient();
+ put_client_write_cxx(
+ tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
+ this->blobHolder.c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
+ put_client_write_cxx(
+ tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
+ this->hash.c_str());
+ rust::String responseStr =
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably
+ // want to delegate performing ops
+ // to separate threads in the base
+ // reactors
+ // data exists?
+ if ((bool)tools::charPtrToInt(responseStr.c_str())) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ put_client_write_cxx(
+ tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
+ std::move(this->value).c_str());
+ put_client_blocking_read_cxx(); // todo this should be avoided
+ // (blocking); we should be able to
+ // ignore responses; we probably want to
+ // delegate performing ops to separate
+ // threads in the base reactors
this->value = "";
} else {
this->persistenceMethod = PersistenceMethod::DB;
@@ -123,6 +160,13 @@
void SendLogReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
+ try {
+ put_client_terminate_cxx();
+ } catch (std::exception &e) {
+ throw std::runtime_error(
+ e.what()); // todo in base reactors we can just handle std exception
+ // instead of keep rethrowing here
+ }
if (!this->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
@@ -138,8 +182,6 @@
this->storeInDatabase();
return;
}
- // todo:blob perform put:add chunk ("")
- // todo:blob perform put:wait for completion
// store in db only when we successfully upload chunks
this->storeInDatabase();
}

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 2:41 AM (17 h, 56 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2567599
Default Alt Text
D5019.diff (5 KB)

Event Timeline