diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -34,12 +34,19 @@ std::string holder; std::string backupID; std::shared_ptr putReactor; + std::shared_ptr holderReactor; + ServiceBlobClient blobClient; std::mutex reactorStateMutex; - std::condition_variable blobDoneCV; - std::mutex blobDoneCVMutex; + + std::condition_variable blobPutDoneCV; + std::mutex blobPutDoneCVMutex; + + std::condition_variable blobAppendHolderDoneCV; + std::mutex blobAppendHolderDoneCVMutex; std::string generateBackupID(); + void initializeHolderReactor(); public: std::unique_ptr handleRequest( @@ -53,6 +60,23 @@ return generateRandomString(); } +void CreateNewBackupReactor::initializeHolderReactor() { + if (this->holder.empty()) { + throw std::runtime_error( + "holder reactor cannot be initialized with empty holder"); + } + if (this->dataHash.empty()) { + throw std::runtime_error( + "holder reactor cannot be initialized with empty hash"); + } + if (this->holderReactor == nullptr) { + this->holderReactor = + std::make_shared( + this->holder, this->dataHash, &this->blobAppendHolderDoneCV); + this->blobClient.appendHolder(this->holderReactor); + } +} + std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { @@ -89,7 +113,7 @@ response->set_backupid(this->backupID); this->holder = this->backupID; this->putReactor = std::make_shared( - this->holder, this->dataHash, &this->blobDoneCV); + this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } @@ -108,13 +132,24 @@ return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobDoneCVMutex); - if (this->putReactor->isDone()) { - if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); + std::unique_lock lock2(this->blobPutDoneCVMutex); + if (this->putReactor->isDone() && !this->putReactor->getStatus().ok()) { + throw std::runtime_error(this->putReactor->getStatus().error_message()); + } + if (!this->putReactor->isDone()) { + this->blobPutDoneCV.wait(lock2); + } + if (this->putReactor->getDataExists()) { + this->initializeHolderReactor(); + std::unique_lock lockHolder(this->blobAppendHolderDoneCVMutex); + if (this->holderReactor->isDone() && + !this->holderReactor->getStatus().ok()) { + throw std::runtime_error( + this->holderReactor->getStatus().error_message()); + } + if (!this->holderReactor->isDone()) { + this->blobAppendHolderDoneCV.wait(lockHolder); } - } else { - this->blobDoneCV.wait(lock2); } try { // TODO add recovery data