diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h @@ -42,16 +42,22 @@ // true) std::string value; std::mutex reactorStateMutex; - std::condition_variable blobDoneCV; - std::mutex blobDoneCVMutex; + + std::condition_variable blobPutDoneCV; + std::mutex blobPutDoneCVMutex; + + std::condition_variable blobAppendHolderDoneCV; + std::mutex blobAppendHolderDoneCVMutex; std::shared_ptr putReactor; + std::shared_ptr holderReactor; ServiceBlobClient blobClient; void storeInDatabase(); std::string generateHolder(); std::string generateLogID(); void initializePutReactor(); + void initializeHolderReactor(); void storeInBlob(const std::string &data) { } @@ -98,11 +104,28 @@ } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( - this->value, this->hash, &this->blobDoneCV); + this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } +void SendLogReactor::initializeHolderReactor() { + if (this->value.empty()) { + throw std::runtime_error( + "holder reactor cannot be initialized with empty value"); + } + if (this->hash.empty()) { + throw std::runtime_error( + "holder reactor cannot be initialized with empty hash"); + } + if (this->holderReactor == nullptr) { + this->holderReactor = + std::make_shared( + this->value, this->hash, &this->blobAppendHolderDoneCV); + this->blobClient.appendHolder(this->holderReactor); + } +} + std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory @@ -188,14 +211,25 @@ return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobDoneCVMutex); + std::unique_lock lockPut(this->blobPutDoneCVMutex); if (!this->putReactor->isDone()) { - this->blobDoneCV.wait(lock2); + this->blobPutDoneCV.wait(lockPut); } else if (!this->putReactor->getStatus().ok()) { throw std::runtime_error(this->putReactor->getStatus().error_message()); } - // store in db only when we successfully upload chunks - this->storeInDatabase(); +} +if (this->putReactor->getDataExists()) { + this->initializeHolderReactor(); + std::unique_lock lockHolder(this->blobAppendHolderDoneCVMutex); + if (!this->holderReactor->isDone()) { + this->blobAppendHolderDoneCV.wait(lockHolder); + } else if (!this->holderReactor->getStatus().ok()) { + throw std::runtime_error(this->holderReactor->getStatus().error_message()); + } +} +} +// store in db only when we successfully upload chunks +this->storeInDatabase(); } void SendLogReactor::doneCallback() {