diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -25,7 +25,9 @@ enum class State { COMPACTION = 1, - LOGS = 2, + COMPACTION_ATTACHMENTS = 2, + LOGS = 3, + LOG_ATTACHMENTS = 4, }; std::shared_ptr backupItem; @@ -39,6 +41,7 @@ std::shared_ptr currentLog; void initializeGetReactor(const std::string &holder); + void nextLog(); public: PullBackupReactor(const backup::PullBackupRequest *request); diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -54,6 +54,7 @@ // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { + response->set_id(this->backupItem->getBackupID()); if (this->getReactor == nullptr) { this->initializeGetReactor(this->backupItem->getCompactionHolder()); } @@ -71,19 +72,32 @@ throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } + if (!this->backupItem->getAttachmentHolders().empty()) { + this->state = State::COMPACTION_ATTACHMENTS; + } else { + this->state = State::LOGS; + } + return nullptr; + } + if (this->state == State::COMPACTION_ATTACHMENTS) { + if (this->backupItem->getAttachmentHolders().empty()) { + throw std::runtime_error("trying to send empty backup attachments"); + } + response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->state = State::LOGS; + return nullptr; } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { - // this means that there are no logs at all so we just terminate with the - // compaction + // this means that there are no logs at all so we just terminate with + // the compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { - // we reached the end of the logs collection so we just want to terminate - // either we terminate with an error if we have some dangling data - // or with success if we don't + // we reached the end of the logs collection so we just want to + // terminate either we terminate with an error if we have some dangling + // data or with success if we don't if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } @@ -100,42 +114,69 @@ if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); if (this->currentLog->getPersistedInBlob()) { - // if the item is stored in the blob, we initialize the get reactor and - // proceed + // if the item is stored in the blob, we initialize the get reactor + // and proceed this->initializeGetReactor(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection + response->set_id(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); - ++this->currentLogIndex; - this->currentLog = nullptr; + if (!this->currentLog->getAttachmentHolders().empty()) { + this->state = State::LOG_ATTACHMENTS; + } else { + this->nextLog(); + } return nullptr; } } - // we want to read the chunks from the blob through the get client until we - // get an empty chunk - a sign of "end of chunks" + response->set_id(this->currentLog->getLogID()); + // we want to read the chunks from the blob through the get client until + // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } - // if we get an empty chunk, we reset the currentLog so we can read the next - // one from the logs collection. + // if we get an empty chunk, we reset the currentLog so we can read the + // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { - ++this->currentLogIndex; - this->currentLog = nullptr; + if (!this->currentLog->getAttachmentHolders().empty()) { + this->state = State::LOG_ATTACHMENTS; + } else { + this->nextLog(); + } return nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } + if (this->state == State::LOG_ATTACHMENTS) { + if (this->currentLog == nullptr) { + throw std::runtime_error( + "trying to send attachments of a non-existing log item"); + } + if (this->currentLog->getAttachmentHolders().empty()) { + throw std::runtime_error("trying to send empty attachments"); + } + response->set_attachmentholders(this->currentLog->getAttachmentHolders()); + this->nextLog(); + return nullptr; + } + throw std::runtime_error("unhandled state"); } +void PullBackupReactor::nextLog() { + ++this->currentLogIndex; + this->currentLog = nullptr; + this->state = State::LOGS; +} + void PullBackupReactor::terminateCallback() { if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error(