diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h @@ -1,6 +1,7 @@ #pragma once #include "BlobGetClientReactor.h" +#include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" #include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" @@ -13,6 +14,7 @@ #include #include #include +#include namespace comm { namespace network { @@ -33,6 +35,9 @@ std::shared_ptr> dataChunks; ServiceBlobClient blobClient; State state = State::COMPACTION; + std::vector> logs; + size_t currentLogIndex = 0; + std::shared_ptr currentLog; void initializeGetReactor(const std::string &holder); @@ -83,7 +88,8 @@ this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } - // TODO get logs + this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( + this->request.backupid()); } std::unique_ptr @@ -91,23 +97,52 @@ // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::COMPACTION: { - this->initializeGetReactor(this->backupItem->getCompactionHolder()); - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (dataChunk.empty()) { - // TODO try to immediately start writing logs instead of wasting a cycle - // sending nothing - this->state = State::LOGS; - return nullptr; - } + if (this->state == State::COMPACTION) { + this->initializeGetReactor(this->backupItem->getCompactionHolder()); + std::string dataChunk; + this->dataChunks->blockingRead(dataChunk); + if (!dataChunk.empty()) { response->set_compactionchunk(dataChunk); return nullptr; } - case State::LOGS: { - throw std::runtime_error("unimplemented"); + if (!this->dataChunks->isEmpty()) { + throw std::runtime_error( + "dangling data discovered after reading compaction"); + } + this->getReactor = nullptr; + this->state = State::LOGS; + } + if (this->state == State::LOGS) { + // TODO make sure logs are received in correct order regardless their size + if (this->logs.empty()) { + return std::make_unique(grpc::Status::OK); + } + if (this->currentLogIndex == this->logs.size()) { + return std::make_unique(grpc::Status::OK); + } + if (this->currentLogIndex > this->logs.size()) { + throw std::runtime_error("log index out of bound"); + } + if (this->currentLog == nullptr) { + this->currentLog = this->logs.at(this->currentLogIndex); + if (this->currentLog->getPersistedInBlob()) { + this->initializeGetReactor(this->currentLog->getValue()); + } else { + response->set_logchunk(this->currentLog->getValue()); + ++this->currentLogIndex; + this->currentLog = nullptr; + return nullptr; + } + } + std::string dataChunk; + this->dataChunks->blockingRead(dataChunk); + if (dataChunk.empty()) { + ++this->currentLogIndex; + this->currentLog = nullptr; + } else { + response->set_logchunk(dataChunk); } + return nullptr; } throw std::runtime_error("unhandled state"); }