diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -53,14 +53,25 @@ // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); + response->set_attachmentholders(""); + response->set_backupid(""); + size_t extraBytesNeeded = 0; if (this->state == State::COMPACTION) { response->set_backupid(this->backupItem->getBackupID()); + extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); + extraBytesNeeded += this->backupItem->getBackupID().size(); + if (this->getReactor == nullptr) { + extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); + extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); + response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!dataChunk.empty()) { + dataChunk = + this->PrepareDataChunkWithExtraData(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); return nullptr; } @@ -72,20 +83,11 @@ throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } - if (!this->backupItem->getAttachmentHolders().empty()) { - this->state = State::COMPACTION_ATTACHMENTS; - } else { - this->state = State::LOGS; - } - return nullptr; - } - if (this->state == State::COMPACTION_ATTACHMENTS) { - if (this->backupItem->getAttachmentHolders().empty()) { - throw std::runtime_error("trying to send empty backup attachments"); - } - response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->state = State::LOGS; - return nullptr; + if (!this->internalBuffer.empty()) { + response->set_compactionchunk(std::move(this->internalBuffer)); + return nullptr; + } } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size @@ -101,6 +103,11 @@ if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } + if (!this->internalBuffer.empty()) { + response->set_logid(this->previousLogID); + response->set_logchunk(std::move(this->internalBuffer)); + return nullptr; + } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { @@ -113,6 +120,13 @@ // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); + extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); + extraBytesNeeded += this->currentLog->getLogID().size(); + + response->set_attachmentholders(this->currentLog->getAttachmentHolders()); + extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); + extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); + if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed @@ -123,14 +137,14 @@ // writeResponse will take another one from the collection response->set_logid(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); - if (!this->currentLog->getAttachmentHolders().empty()) { - this->state = State::LOG_ATTACHMENTS; - } else { - this->nextLog(); - } + this->nextLog(); return nullptr; } + } else { + extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); + extraBytesNeeded += this->currentLog->getLogID().size(); } + response->set_backupid(this->currentLog->getBackupID()); response->set_logid(this->currentLog->getLogID()); // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" @@ -144,29 +158,15 @@ // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { - if (!this->currentLog->getAttachmentHolders().empty()) { - this->state = State::LOG_ATTACHMENTS; - } else { - this->nextLog(); - } + this->nextLog(); return nullptr; } else { + dataChunk = + this->PrepareDataChunkWithExtraData(dataChunk, extraBytesNeeded); response->set_logchunk(dataChunk); } return nullptr; } - if (this->state == State::LOG_ATTACHMENTS) { - if (this->currentLog == nullptr) { - throw std::runtime_error( - "trying to send attachments of a non-existing log item"); - } - if (this->currentLog->getAttachmentHolders().empty()) { - throw std::runtime_error("trying to send empty attachments"); - } - response->set_attachmentholders(this->currentLog->getAttachmentHolders()); - this->nextLog(); - return nullptr; - } throw std::runtime_error("unhandled state"); }