diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h index 7aba3ab35..a563ba298 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobGetClientReactor.h @@ -1,54 +1,59 @@ #pragma once #include "ClientReadReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobGetClientReactor : public ClientReadReactorBase { std::string holder; std::shared_ptr> dataChunks; public: BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks); std::unique_ptr readResponse(blob::GetResponse &response) override; void doneCallback() override; + grpc::Status getStatus() const; }; BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks) : holder(holder), dataChunks(dataChunks) { } std::unique_ptr BlobGetClientReactor::readResponse(blob::GetResponse &response) { if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { throw std::runtime_error( "error reading compaction data from the blob service"); } return nullptr; } void BlobGetClientReactor::doneCallback() { this->dataChunks->write(""); } +grpc::Status BlobGetClientReactor::getStatus() const { + return this->status; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h index 066aa15bc..ffe03bac9 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,100 +1,105 @@ #pragma once #include "ClientBidiReactorBase.h" #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobPutClientReactor : public ClientBidiReactorBase { enum class State { SEND_HOLDER = 0, SEND_HASH = 1, SEND_CHUNKS = 2, }; State state = State::SEND_HOLDER; const std::string hash; const std::string holder; size_t currentDataSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; std::condition_variable *terminationNotifier; public: BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::unique_ptr dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; void doneCallback() override; + grpc::Status getStatus() const; }; BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier) : holder(holder), hash(hash), dataChunks(folly::MPMCQueue(100)), terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk( std::unique_ptr dataChunk) { if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } } std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); this->state = State::SEND_CHUNKS; return nullptr; } if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; this->dataChunks.blockingRead(dataChunk); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } request.set_datachunk(dataChunk); return nullptr; } void BlobPutClientReactor::doneCallback() { this->terminationNotifier->notify_one(); } +grpc::Status BlobPutClientReactor::getStatus() const { + return this->status; +} + } // namespace reactor } // namespace network } // namespace comm