diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h index 032e7a7d3..066aa15bc 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,88 +1,100 @@ #pragma once #include "ClientBidiReactorBase.h" #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include +#include #include #include #include namespace comm { namespace network { namespace reactor { class BlobPutClientReactor : public ClientBidiReactorBase { enum class State { SEND_HOLDER = 0, SEND_HASH = 1, SEND_CHUNKS = 2, }; State state = State::SEND_HOLDER; const std::string hash; const std::string holder; size_t currentDataSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; + std::condition_variable *terminationNotifier; public: - BlobPutClientReactor(const std::string &holder, const std::string &hash); + BlobPutClientReactor( + const std::string &holder, + const std::string &hash, + std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::unique_ptr dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; + void doneCallback() override; }; BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, - const std::string &hash) + const std::string &hash, + std::condition_variable *terminationNotifier) : holder(holder), hash(hash), - dataChunks(folly::MPMCQueue(100)) { + dataChunks(folly::MPMCQueue(100)), + terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk( std::unique_ptr dataChunk) { if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } } std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); this->state = State::SEND_CHUNKS; return nullptr; } if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; this->dataChunks.blockingRead(dataChunk); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } request.set_datachunk(dataChunk); return nullptr; } +void BlobPutClientReactor::doneCallback() { + this->terminationNotifier->notify_one(); +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h index 76ecc73a9..0d7998552 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h @@ -1,51 +1,42 @@ #pragma once #include "BlobPutClientReactor.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { class ServiceBlobClient { std::unique_ptr stub; +public: ServiceBlobClient() { - // TODO: handle other types of connection + // todo handle different types of connection(e.g. load balancer) std::string targetStr = "blob-server:50051"; std::shared_ptr channel = grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); this->stub = blob::BlobService::NewStub(channel); } -public: - static ServiceBlobClient &getInstance() { - // todo consider threads - static ServiceBlobClient instance; - return instance; - } - - std::unique_ptr putReactor; - - void put(const std::string &holder, const std::string &hash) { - if (this->putReactor != nullptr && !this->putReactor->isDone()) { + void put(std::shared_ptr putReactor) { + if (putReactor == nullptr) { throw std::runtime_error( - "trying to run reactor while the previous one is not finished yet"); + "put reactor is being used but has not been initialized"); } - this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash)); - this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor)); - this->putReactor->nextWrite(); + this->stub->async()->Put(&putReactor->context, &(*putReactor)); + putReactor->nextWrite(); } // void get(const std::string &holder); // void remove(const std::string &holder); }; } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h index 9e976f3c8..93532741d 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -1,94 +1,100 @@ #pragma once #include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" #include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include #include +#include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { KEY_ENTROPY = 1, DATA_HASH = 2, DATA_CHUNKS = 3, }; State state = State::KEY_ENTROPY; std::string keyEntropy; std::string dataHash; std::string backupID; + std::shared_ptr putReactor; + ServiceBlobClient blobClient; + std::mutex reactorStateMutex; + std::condition_variable blobDoneCV; + std::mutex blobDoneCVMutex; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void doneCallback(); }; std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { + // we make sure that the blob client's state is flushed to the main memory + // as there may be multiple threads from the pool taking over here + const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); - ServiceBlobClient::getInstance().put(this->backupID, this->dataHash); + this->putReactor = std::make_shared( + this->backupID, this->dataHash, &this->blobDoneCV); + this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { - // TODO initialize blob client reactor - if (ServiceBlobClient::getInstance().putReactor == nullptr) { - throw std::runtime_error( - "blob client reactor has not been initialized"); - } - - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( - std::make_unique( - std::move(*request.mutable_newcompactionchunk()))); - + this->putReactor->scheduleSendingDataChunk(std::make_unique( + std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::doneCallback() { - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( - std::make_unique("")); + const std::lock_guard lock(this->reactorStateMutex); + this->putReactor->scheduleSendingDataChunk(std::make_unique("")); + std::unique_lock lock2(this->blobDoneCVMutex); + this->blobDoneCV.wait(lock2); } } // namespace reactor } // namespace network } // namespace comm