diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h @@ -1,6 +1,7 @@ #pragma once #include "BlobGetClientReactor.h" +#include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" #include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" @@ -14,6 +15,7 @@ #include #include #include +#include namespace comm { namespace network { @@ -34,6 +36,9 @@ std::shared_ptr> dataChunks; ServiceBlobClient blobClient; State state = State::COMPACTION; + std::vector> logs; + size_t currentLogIndex = 0; + std::shared_ptr currentLog; void initializeGetReactor(const std::string &holder); @@ -83,7 +88,8 @@ this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } - // TODO get logs + this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( + this->request.backupid()); } std::unique_ptr @@ -96,17 +102,49 @@ this->initializeGetReactor(this->backupItem->getCompactionHolder()); std::string dataChunk; this->dataChunks->blockingRead(dataChunk); - if (dataChunk.empty()) { - // TODO try to immediately start writing logs instead of wasting a cycle - // sending nothing - this->state = State::LOGS; + if (!dataChunk.empty()) { + response->set_compactionchunk(dataChunk); return nullptr; + } else { + if (!this->dataChunks->isEmpty()) { + throw std::runtime_error( + "dangling data discovered after reading compaction"); + } + this->getReactor = nullptr; + this->state = State::LOGS; + // WARNING: intentionally letting the flow enter case State::LOGS from + // here, because we want to start sending logs right away instead of + // going one more cycle } - response->set_compactionchunk(dataChunk); - return nullptr; } case State::LOGS: { - throw std::runtime_error("unimplemented"); + // TODO make sure logs are received in correct order regardless their size + if (this->logs.empty()) { + return std::make_unique(grpc::Status::OK); + } + if (this->currentLogIndex >= this->logs.size()) { + return std::make_unique(grpc::Status::OK); + } + if (this->currentLog == nullptr) { + this->currentLog = this->logs.at(this->currentLogIndex); + if (this->currentLog->getPersistedInBlob()) { + this->initializeGetReactor(this->currentLog->getValue()); + } else { + response->set_logchunk(this->currentLog->getValue()); + ++this->currentLogIndex; + this->currentLog = nullptr; + return nullptr; + } + } + std::string dataChunk; + this->dataChunks->blockingRead(dataChunk); + if (dataChunk.empty()) { + ++this->currentLogIndex; + this->currentLog = nullptr; + } else { + response->set_logchunk(dataChunk); + } + return nullptr; } } throw std::runtime_error("unhandled state");