diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -19,11 +20,13 @@ : public ClientReadReactorBase { std::string holder; std::shared_ptr> dataChunks; + std::condition_variable *terminationNotifier; public: BlobGetClientReactor( const std::string &holder, - std::shared_ptr> dataChunks); + std::shared_ptr> dataChunks, + std::condition_variable *terminationNotifier); std::unique_ptr readResponse(blob::GetResponse &response) override; diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -7,8 +7,11 @@ BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, - std::shared_ptr> dataChunks) - : holder(holder), dataChunks(dataChunks) { + std::shared_ptr> dataChunks, + std::condition_variable *terminationNotifier) + : holder(holder), + dataChunks(dataChunks), + terminationNotifier(terminationNotifier) { } std::unique_ptr @@ -21,6 +24,7 @@ void BlobGetClientReactor::doneCallback() { this->dataChunks->write(""); + this->terminationNotifier->notify_one(); } } // namespace reactor diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -40,6 +40,9 @@ size_t currentLogIndex = 0; std::shared_ptr currentLog; + std::condition_variable blobGetDoneCV; + std::mutex blobGetDoneCVMutex; + void initializeGetReactor(const std::string &holder); void nextLog(); diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -20,8 +20,8 @@ throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - this->getReactor.reset( - new reactor::BlobGetClientReactor(holder, this->dataChunks)); + this->getReactor.reset(new reactor::BlobGetClientReactor( + holder, this->dataChunks, &this->blobGetDoneCV)); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } @@ -178,10 +178,22 @@ } void PullBackupReactor::terminateCallback() { + const std::lock_guard lock(this->reactorStateMutex); + std::unique_lock lockGet(this->blobGetDoneCVMutex); + if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { + this->blobGetDoneCV.wait(lockGet); + } + if (this->getReactor->getStatusHolder()->state != ReactorState::DONE) { + throw std::runtime_error("get reactor has not been terminated properly"); + } if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } + if (!this->getStatusHolder()->getStatus().ok()) { + throw std::runtime_error( + this->getStatusHolder()->getStatus().error_message()); + } } } // namespace reactor