diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -4,6 +4,8 @@ #include "GlobalTools.h" #include "Tools.h" +#include "blob_client/src/lib.rs.h" + namespace comm { namespace network { namespace reactor { @@ -64,12 +66,58 @@ } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - // todo:blob perform put:initialize + try { + put_client_initialize_cxx(); + put_client_write_cxx( + tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), + this->holder.c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + put_client_write_cxx( + tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), + this->dataHash.c_str()); + + rust::String responseStr = + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors + // data exists? + if ((bool)tools::charPtrToInt(responseStr.c_str())) { + return std::make_unique( + grpc::Status::OK, true); + } + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } return nullptr; } case State::DATA_CHUNKS: { - // todo:blob perform put:add chunk - // (std::move(*request.mutable_newcompactionchunk()) + if (request.mutable_newcompactionchunk()->empty()) { + return std::make_unique(grpc::Status::OK); + } + try { + put_client_write_cxx( + tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), + std::string(std::move(*request.mutable_newcompactionchunk())) + .c_str()); + put_client_blocking_read_cxx(); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } return nullptr; } } @@ -78,8 +126,13 @@ void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - // todo:blob perform put:add chunk ("") - // todo:blob perform put:wait for completion + try { + put_client_terminate_cxx(); + } catch (std::exception &e) { + throw std::runtime_error( + e.what()); // todo in base reactors we can just handle std exception + // instead of keep rethrowing here + } // TODO add recovery data // TODO handle attachments holders