diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h index 7023ee680..032e7a7d3 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,91 +1,88 @@ #pragma once #include "ClientBidiReactorBase.h" #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobPutClientReactor : public ClientBidiReactorBase { enum class State { SEND_HOLDER = 0, SEND_HASH = 1, SEND_CHUNKS = 2, }; State state = State::SEND_HOLDER; const std::string hash; const std::string holder; size_t currentDataSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; public: BlobPutClientReactor(const std::string &holder, const std::string &hash); - void scheduleSendingDataChunk(const std::string &dataChunk); + void scheduleSendingDataChunk(std::unique_ptr dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; }; BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash) : holder(holder), hash(hash), dataChunks(folly::MPMCQueue(100)) { } void BlobPutClientReactor::scheduleSendingDataChunk( - const std::string &dataChunk) { - // TODO: we may be copying a big chunk of data, but `write` seems to only - // accept `std::move` - std::string str = std::string(dataChunk); - if (!this->dataChunks.write(std::move(str))) { + std::unique_ptr dataChunk) { + if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } } std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); this->state = State::SEND_CHUNKS; return nullptr; } if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; this->dataChunks.blockingRead(dataChunk); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } request.set_datachunk(dataChunk); return nullptr; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h index 4b97e8b55..9e976f3c8 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -1,92 +1,94 @@ #pragma once #include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" #include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { KEY_ENTROPY = 1, DATA_HASH = 2, DATA_CHUNKS = 3, }; State state = State::KEY_ENTROPY; std::string keyEntropy; std::string dataHash; std::string backupID; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void doneCallback(); }; std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { switch (this->state) { case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); ServiceBlobClient::getInstance().put(this->backupID, this->dataHash); return nullptr; } case State::DATA_CHUNKS: { // TODO initialize blob client reactor if (ServiceBlobClient::getInstance().putReactor == nullptr) { throw std::runtime_error( "blob client reactor has not been initialized"); } ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( - request.newcompactionchunk()); + std::make_unique( + std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::doneCallback() { - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk(""); + ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( + std::make_unique("")); } } // namespace reactor } // namespace network } // namespace comm