diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -0,0 +1,119 @@ +#pragma once + +#include "ClientBidiReactorBase.h" +#include "Constants.h" + +#include "../_generated/blob.grpc.pb.h" +#include "../_generated/blob.pb.h" + +#include +#include + +#include +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +class BlobPutClientReactor + : public ClientBidiReactorBase { + + enum class State { + SEND_HOLDER = 0, + SEND_HASH = 1, + SEND_CHUNKS = 2, + }; + + State state = State::SEND_HOLDER; + const std::string hash; + const std::string holder; + size_t currentDataSize = 0; + const size_t chunkSize = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + folly::MPMCQueue dataChunks; + +public: + BlobPutClientReactor(const std::string &holder, const std::string &hash); + void scheduleSendingDataChunk(const std::string &dataChunk); + std::unique_ptr prepareRequest( + blob::PutRequest &request, + std::shared_ptr previousResponse) override; + void doneCallback() override; +}; + +BlobPutClientReactor::BlobPutClientReactor( + const std::string &holder, + const std::string &hash) + : holder(holder), + hash(hash), + dataChunks(folly::MPMCQueue(20)) { +} + +void BlobPutClientReactor::scheduleSendingDataChunk( + const std::string &dataChunk) { + // TODO: we may be copying a big chunk of data, but `write` seems to only + // accept `std::move` + std::cout << "[BC] here schedule sending data chunks 1: " + << std::hash{}(std::this_thread::get_id()) + << std::endl; + std::string str = std::string(dataChunk); + // std::cout << "here schedule sending data chunks 1.1" << std::endl; + // std::unique_ptr upt = std::make_unique(str); + // std::cout << "here schedule sending data chunks 1.2" << std::endl; + if (!this->dataChunks.write(std::move(str))) { + std::cout << "here schedule sending data chunks 2" << std::endl; + throw std::runtime_error( + "Error scheduling sending a data chunk to send to the blob service"); + } + // this->dataChunks.blockingWrite(std::move(str)); + std::cout << "[BC] here schedule sending data chunks 3" << std::endl; +} + +std::unique_ptr BlobPutClientReactor::prepareRequest( + blob::PutRequest &request, + std::shared_ptr previousResponse) { + std::cout << "[BC] here blob put reactor entry" << std::endl; + if (this->state == State::SEND_HOLDER) { + std::cout << "[BC] here blob put reactor send holder" << std::endl; + this->request.set_holder(this->holder); + this->state = State::SEND_HASH; + return nullptr; + } + if (this->state == State::SEND_HASH) { + std::cout << "[BC] here blob put reactor send hash" << std::endl; + request.set_blobhash(this->hash); + this->state = State::SEND_CHUNKS; + return nullptr; + } + std::cout << "[BC] here blob put reactor data chunks" << std::endl; + if (previousResponse->dataexists()) { + std::cout << "data exists - aborting" << std::endl; + return std::make_unique(grpc::Status::OK); + } + std::cout << "[BC] data NOT exists - continue" << std::endl; + std::string dataChunk; + std::cout << "[BC] here blob put reactor waiting for a chunk: " + << std::hash{}(std::this_thread::get_id()) + << std::endl; + this->dataChunks.blockingRead(dataChunk); + std::cout << "[BC] read from the queue " << dataChunk.size() << std::endl; + if (dataChunk.empty()) { + std::cout << "[BC] empty message - terminating " << dataChunk.size() + << std::endl; + return std::make_unique(grpc::Status::OK); + } + request.set_datachunk(dataChunk); + return nullptr; +} + +void BlobPutClientReactor::doneCallback() { + std::cout << "[BC] blob put client done " << this->status.error_code() << "/" + << this->status.error_message() << std::endl; +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h @@ -0,0 +1,51 @@ +#pragma once + +#include "BlobPutClientReactor.h" + +#include "../_generated/blob.grpc.pb.h" +#include "../_generated/blob.pb.h" + +#include + +#include +#include +#include + +namespace comm { +namespace network { + +class ServiceBlobClient { + std::unique_ptr stub; + + ServiceBlobClient() { + std::string targetStr = "blob-server:50051"; + std::shared_ptr channel = + grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); + this->stub = blob::BlobService::NewStub(channel); + } + +public: + static ServiceBlobClient &getInstance() { + // todo consider threads + static ServiceBlobClient instance; + return instance; + } + + std::unique_ptr putReactor; + + void put(const std::string &holder, const std::string &hash) { + std::cout << "blob client - put initialize" << std::endl; + if (this->putReactor != nullptr && !this->putReactor->isDone()) { + throw std::runtime_error( + "trying to run reactor while the previous one is not finished yet"); + } + this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash)); + this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor)); + this->putReactor->nextWrite(); + } + // void get(const std::string &holder); + // void remove(const std::string &holder); +}; + +} // namespace network +} // namespace comm