diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -44,7 +44,7 @@ void storeInDatabase(); std::string generateLogID(const std::string &backupID); - void initializePutReactor(); + void initializePutClient(); public: using ServerReadReactorBase:: diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,5 +1,7 @@ #include "SendLogReactor.h" +#include "blob_client/src/lib.rs.h" + #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" @@ -33,7 +35,7 @@ std::to_string(tools::getCurrentTimestamp()); } -void SendLogReactor::initializePutReactor() { +void SendLogReactor::initializePutClient() { if (this->blobHolder.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty blob holder"); @@ -42,7 +44,7 @@ throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - // todo:blob perform put:initialize + put_client_initialize_cxx(); } std::unique_ptr @@ -86,9 +88,7 @@ if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } - std::unique_ptr chunk = - std::make_unique(std::move(*request.mutable_logdata())); - if (chunk->size() == 0) { + if (request.mutable_logdata()->size() == 0) { return std::make_unique(grpc::Status::OK); } if (this->persistenceMethod == PersistenceMethod::DB) { @@ -98,10 +98,18 @@ "), merge them into bigger parts instead"); } if (this->persistenceMethod == PersistenceMethod::BLOB) { - // todo:blob perform put:add chunk (std::move(chunk)) + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), + request.mutable_logdata()->c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + return nullptr; } - this->value += std::move(*chunk); + this->value += std::move(*request.mutable_logdata()); database::LogItem logItem = database::LogItem( this->backupID, this->logID, true, this->value, "", this->hash); if (database::LogItem::getItemSize(&logItem) > @@ -109,8 +117,37 @@ this->persistenceMethod = PersistenceMethod::BLOB; this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); - this->initializePutReactor(); - // todo:blob perform put:add chunk (this->value) + this->initializePutClient(); + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::HOLDER), + this->blobHolder.c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::HASH), + this->hash.c_str()); + rust::String responseStr = + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors + // data exists? + if ((bool)tools::charPtrToInt(responseStr.c_str())) { + return std::make_unique(grpc::Status::OK); + } + put_client_write_cxx( + tools::getBlobPutField(tools::BlobPutField::DATA_CHUNK), + std::move(this->value).c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; @@ -123,6 +160,13 @@ void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); + try { + put_client_terminate_cxx(); + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( @@ -138,8 +182,6 @@ this->storeInDatabase(); return; } - // todo:blob perform put:add chunk ("") - // todo:blob perform put:wait for completion // store in db only when we successfully upload chunks this->storeInDatabase(); }