diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h index 31a674211..3ee74f097 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h @@ -1,96 +1,117 @@ #pragma once #include "BlobGetClientReactor.h" #include "DatabaseManager.h" #include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class PullBackupReactor : public ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { enum class State { COMPACTION = 1, LOGS = 2, }; std::shared_ptr backupItem; std::shared_ptr getReactor; std::mutex reactorStateMutex; std::shared_ptr> dataChunks; ServiceBlobClient blobClient; State state = State::COMPACTION; void initializeGetReactor(const std::string &holder); public: PullBackupReactor(const backup::PullBackupRequest *request); void initialize() override; std::unique_ptr writeResponse(backup::PullBackupResponse *response) override; }; PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } if (this->getReactor == nullptr) { this->getReactor = std::make_shared( holder, this->dataChunks); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } // TODO get logs } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { - throw std::runtime_error("unimplemented"); + // we make sure that the blob client's state is flushed to the main memory + // as there may be multiple threads from the pool taking over here + const std::lock_guard lock(this->reactorStateMutex); + switch (this->state) { + case State::COMPACTION: { + this->initializeGetReactor(this->backupItem->getCompactionHolder()); + std::string dataChunk; + this->dataChunks->blockingRead(dataChunk); + if (dataChunk.empty()) { + // TODO try to immediately start writing logs instead of wasting a cycle + // sending nothing + this->state = State::LOGS; + return nullptr; + } + response->set_compactionchunk(dataChunk); + return nullptr; + } + case State::LOGS: { + throw std::runtime_error("unimplemented"); + } + } + throw std::runtime_error("unhandled state"); } } // namespace reactor } // namespace network } // namespace comm