diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -33,21 +34,28 @@ const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; + std::condition_variable *terminationNotifier; public: - BlobPutClientReactor(const std::string &holder, const std::string &hash); + BlobPutClientReactor( + const std::string &holder, + const std::string &hash, + std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::string &dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; + void doneCallback() override; }; BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, - const std::string &hash) + const std::string &hash, + std::condition_variable *terminationNotifier) : holder(holder), hash(hash), - dataChunks(folly::MPMCQueue(100)) { + dataChunks(folly::MPMCQueue(100)), + terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk(std::string &dataChunk) { @@ -82,6 +90,10 @@ return nullptr; } +void BlobPutClientReactor::doneCallback() { + this->terminationNotifier->notify_one(); +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h @@ -17,31 +17,23 @@ class ServiceBlobClient { std::unique_ptr stub; +public: ServiceBlobClient() { - // TODO: handle other types of connection + // todo handle different types of connection(e.g. load balancer) std::string targetStr = "blob-server:50051"; std::shared_ptr channel = grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); this->stub = blob::BlobService::NewStub(channel); } -public: - static ServiceBlobClient &getInstance() { - // todo consider threads - static ServiceBlobClient instance; - return instance; - } - - std::unique_ptr putReactor; - - void put(const std::string &holder, const std::string &hash) { - if (this->putReactor != nullptr && !this->putReactor->isDone()) { + void put(std::shared_ptr putReactor) { + std::cout << "blob client - put initialize" << std::endl; + if (putReactor == nullptr) { throw std::runtime_error( - "trying to run reactor while the previous one is not finished yet"); + "put reactor is being used but has not been initialized"); } - this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash)); - this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor)); - this->putReactor->nextWrite(); + this->stub->async()->Put(&putReactor->context, &(*putReactor)); + putReactor->nextWrite(); } // void get(const std::string &holder); // void remove(const std::string &holder); diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -7,7 +7,9 @@ #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include #include +#include #include namespace comm { @@ -27,6 +29,11 @@ std::string keyEntropy; std::string dataHash; std::string backupID; + std::shared_ptr putReactor; + ServiceBlobClient blobClient; + std::mutex blobPutClientReactorMutex; + std::condition_variable waitingForBlobClientCV; + std::mutex waitingForBlobClientCVMutex; std::string generateBackupID(); @@ -45,6 +52,9 @@ std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { + // we make sure that the blob client's state is flushed to the main memory + // as there may be multiple threads from the pool taking over here + const std::lock_guard lock(this->blobPutClientReactorMutex); switch (this->state) { case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { @@ -64,17 +74,13 @@ // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); - ServiceBlobClient::getInstance().put(this->backupID, this->dataHash); + this->putReactor = std::make_shared( + this->backupID, this->dataHash, &this->waitingForBlobClientCV); + this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { - // TODO initialize blob client reactor - if (ServiceBlobClient::getInstance().putReactor == nullptr) { - throw std::runtime_error( - "blob client reactor has not been initialized"); - } - - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( + this->putReactor->scheduleSendingDataChunk( *request.mutable_newcompactionchunk()); return nullptr; @@ -84,9 +90,11 @@ } void CreateNewBackupReactor::doneCallback() { + const std::lock_guard lock(this->blobPutClientReactorMutex); std::string emptyString = ""; - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( - emptyString); + this->putReactor->scheduleSendingDataChunk(emptyString); + std::unique_lock lock2(this->waitingForBlobClientCVMutex); + this->waitingForBlobClientCV.wait(lock2); } } // namespace reactor