diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h new file mode 100644 index 000000000..7023ee680 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -0,0 +1,91 @@ +#pragma once + +#include "ClientBidiReactorBase.h" +#include "Constants.h" + +#include "../_generated/blob.grpc.pb.h" +#include "../_generated/blob.pb.h" + +#include +#include + +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +class BlobPutClientReactor + : public ClientBidiReactorBase { + + enum class State { + SEND_HOLDER = 0, + SEND_HASH = 1, + SEND_CHUNKS = 2, + }; + + State state = State::SEND_HOLDER; + const std::string hash; + const std::string holder; + size_t currentDataSize = 0; + const size_t chunkSize = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + folly::MPMCQueue dataChunks; + +public: + BlobPutClientReactor(const std::string &holder, const std::string &hash); + void scheduleSendingDataChunk(const std::string &dataChunk); + std::unique_ptr prepareRequest( + blob::PutRequest &request, + std::shared_ptr previousResponse) override; +}; + +BlobPutClientReactor::BlobPutClientReactor( + const std::string &holder, + const std::string &hash) + : holder(holder), + hash(hash), + dataChunks(folly::MPMCQueue(100)) { +} + +void BlobPutClientReactor::scheduleSendingDataChunk( + const std::string &dataChunk) { + // TODO: we may be copying a big chunk of data, but `write` seems to only + // accept `std::move` + std::string str = std::string(dataChunk); + if (!this->dataChunks.write(std::move(str))) { + throw std::runtime_error( + "Error scheduling sending a data chunk to send to the blob service"); + } +} + +std::unique_ptr BlobPutClientReactor::prepareRequest( + blob::PutRequest &request, + std::shared_ptr previousResponse) { + if (this->state == State::SEND_HOLDER) { + this->request.set_holder(this->holder); + this->state = State::SEND_HASH; + return nullptr; + } + if (this->state == State::SEND_HASH) { + request.set_blobhash(this->hash); + this->state = State::SEND_CHUNKS; + return nullptr; + } + if (previousResponse->dataexists()) { + return std::make_unique(grpc::Status::OK); + } + std::string dataChunk; + this->dataChunks.blockingRead(dataChunk); + if (dataChunk.empty()) { + return std::make_unique(grpc::Status::OK); + } + request.set_datachunk(dataChunk); + return nullptr; +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h new file mode 100644 index 000000000..76ecc73a9 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h @@ -0,0 +1,51 @@ +#pragma once + +#include "BlobPutClientReactor.h" + +#include "../_generated/blob.grpc.pb.h" +#include "../_generated/blob.pb.h" + +#include + +#include +#include +#include + +namespace comm { +namespace network { + +class ServiceBlobClient { + std::unique_ptr stub; + + ServiceBlobClient() { + // TODO: handle other types of connection + std::string targetStr = "blob-server:50051"; + std::shared_ptr channel = + grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); + this->stub = blob::BlobService::NewStub(channel); + } + +public: + static ServiceBlobClient &getInstance() { + // todo consider threads + static ServiceBlobClient instance; + return instance; + } + + std::unique_ptr putReactor; + + void put(const std::string &holder, const std::string &hash) { + if (this->putReactor != nullptr && !this->putReactor->isDone()) { + throw std::runtime_error( + "trying to run reactor while the previous one is not finished yet"); + } + this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash)); + this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor)); + this->putReactor->nextWrite(); + } + // void get(const std::string &holder); + // void remove(const std::string &holder); +}; + +} // namespace network +} // namespace comm