diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -59,7 +59,6 @@ std::cout << "DONE [code=" << status.error_code() << "][err=" << status.error_message() << "]" << std::endl; this->done = true; - this->doneCallback(); } template @@ -91,6 +90,7 @@ void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); + this->doneCallback(); } } // namespace reactor diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/BlobPutClientReactor.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -34,9 +35,13 @@ const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; + std::condition_variable *terminationNotifier; public: - BlobPutClientReactor(const std::string &holder, const std::string &hash); + BlobPutClientReactor( + const std::string &holder, + const std::string &hash, + std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::string &dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, @@ -46,25 +51,23 @@ BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, - const std::string &hash) + const std::string &hash, + std::condition_variable *terminationNotifier) : holder(holder), hash(hash), - dataChunks(folly::MPMCQueue(20)) { + dataChunks(folly::MPMCQueue(20)), + terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk(std::string &dataChunk) { std::cout << "[BC] here schedule sending data chunks 1: " << std::hash{}(std::this_thread::get_id()) << std::endl; - // std::cout << "here schedule sending data chunks 1.1" << std::endl; - // std::unique_ptr upt = std::make_unique(str); - // std::cout << "here schedule sending data chunks 1.2" << std::endl; if (!this->dataChunks.write(std::move(dataChunk))) { std::cout << "here schedule sending data chunks 2" << std::endl; throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } - // this->dataChunks.blockingWrite(std::move(str)); std::cout << "[BC] here schedule sending data chunks 3" << std::endl; } @@ -108,6 +111,7 @@ void BlobPutClientReactor::doneCallback() { std::cout << "[BC] blob put client done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; + this->terminationNotifier->notify_one(); } } // namespace reactor diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h --- a/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/blob/ServiceBlobClient.h @@ -17,31 +17,23 @@ class ServiceBlobClient { std::unique_ptr stub; +public: ServiceBlobClient() { + // todo handle different types of connection(e.g. load balancer) std::string targetStr = "blob-server:50051"; std::shared_ptr channel = grpc::CreateChannel(targetStr, grpc::InsecureChannelCredentials()); this->stub = blob::BlobService::NewStub(channel); } -public: - static ServiceBlobClient &getInstance() { - // todo consider threads - static ServiceBlobClient instance; - return instance; - } - - std::unique_ptr putReactor; - - void put(const std::string &holder, const std::string &hash) { + void put(std::shared_ptr putReactor) { std::cout << "blob client - put initialize" << std::endl; - if (this->putReactor != nullptr && !this->putReactor->isDone()) { + if (putReactor == nullptr) { throw std::runtime_error( - "trying to run reactor while the previous one is not finished yet"); + "put reactor is being used but has not been initialized"); } - this->putReactor.reset(new reactor::BlobPutClientReactor(holder, hash)); - this->stub->async()->Put(&this->putReactor->context, &(*this->putReactor)); - this->putReactor->nextWrite(); + this->stub->async()->Put(&putReactor->context, &(*putReactor)); + putReactor->nextWrite(); } // void get(const std::string &holder); // void remove(const std::string &holder); diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -7,8 +7,10 @@ #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include #include #include +#include #include #include @@ -31,6 +33,11 @@ std::string keyEntropy; std::string dataHash; std::string backupID; + std::shared_ptr putReactor; + ServiceBlobClient blobClient; + std::mutex blobPutClientReactorMutex; + std::condition_variable waitingForBlobClientCV; + std::mutex waitingForBlobClientCVMutex; std::string generateBackupID(); @@ -39,6 +46,10 @@ backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void doneCallback(); + + virtual ~CreateNewBackupReactor() { + std::cout << "[CNR] DTOR" << std::endl; + } }; std::string CreateNewBackupReactor::generateBackupID() { @@ -49,6 +60,9 @@ std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { + // we make sure that the blob client's state is flushed to the main memory + // as there may be multiple threads from the pool taking over here + const std::lock_guard lock(this->blobPutClientReactorMutex); std::cout << "[CNR] here handle request" << std::endl; switch (this->state) { case State::KEY_ENTROPY: { @@ -70,20 +84,17 @@ // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); - ServiceBlobClient::getInstance().put(this->backupID, this->dataHash); + this->putReactor = std::make_shared( + this->backupID, this->dataHash, &this->waitingForBlobClientCV); + this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { std::cout << "[CNR] here handle request data chunk " << request.newcompactionchunk().size() << std::endl; - // TODO initialize blob client reactor - if (ServiceBlobClient::getInstance().putReactor == nullptr) { - throw std::runtime_error( - "blob client reactor has not been initialized"); - } std::cout << "[CNR] here enqueueing data chunk" << std::endl; - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( + this->putReactor->scheduleSendingDataChunk( *request.mutable_newcompactionchunk()); return nullptr; @@ -93,12 +104,17 @@ } void CreateNewBackupReactor::doneCallback() { + const std::lock_guard lock(this->blobPutClientReactorMutex); std::cout << "[CNR] create new backup done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; std::cout << "[CNR] enqueueing empty chunk to end blob upload" << std::endl; std::string emptyString = ""; - ServiceBlobClient::getInstance().putReactor->scheduleSendingDataChunk( - emptyString); + this->putReactor->scheduleSendingDataChunk(emptyString); + std::cout << "[CNR] waiting for the blob client to complete" << std::endl; + std::unique_lock lock2(this->waitingForBlobClientCVMutex); + this->waitingForBlobClientCV.wait(lock2); + std::cout << "[CNR] the blob client to completed, CNR can exit gracefully" + << std::endl; } } // namespace reactor