diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -41,6 +41,7 @@ std::shared_ptr currentLog; std::string internalBuffer; std::string previousLogID; + bool endOfQueue = false; std::condition_variable blobGetDoneCV; std::mutex blobGetDoneCVMutex; diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -53,14 +53,27 @@ // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); + response->set_attachmentholders(""); + response->set_backupid(""); + size_t extraBytesNeeded = 0; if (this->state == State::COMPACTION) { response->set_backupid(this->backupItem->getBackupID()); + extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size(); + extraBytesNeeded += this->backupItem->getBackupID().size(); + if (this->getReactor == nullptr) { + extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); + extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); + response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!dataChunk.empty()) { + if (this->internalBuffer.size() < this->chunkLimit) { + this->dataChunks->blockingRead(dataChunk); + } + if (!dataChunk.empty() || this->internalBuffer.size() >= this->chunkLimit) { + dataChunk = + this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); return nullptr; } @@ -72,20 +85,11 @@ throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } - if (!this->backupItem->getAttachmentHolders().empty()) { - this->state = State::COMPACTION_ATTACHMENTS; - } else { - this->state = State::LOGS; - } - return nullptr; - } - if (this->state == State::COMPACTION_ATTACHMENTS) { - if (this->backupItem->getAttachmentHolders().empty()) { - throw std::runtime_error("trying to send empty backup attachments"); - } - response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->state = State::LOGS; - return nullptr; + if (!this->internalBuffer.empty()) { + response->set_compactionchunk(std::move(this->internalBuffer)); + return nullptr; + } } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size @@ -101,6 +105,11 @@ if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } + if (!this->internalBuffer.empty()) { + response->set_logid(this->previousLogID); + response->set_logchunk(std::move(this->internalBuffer)); + return nullptr; + } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { @@ -113,6 +122,13 @@ // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); + extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); + extraBytesNeeded += this->currentLog->getLogID().size(); + + response->set_attachmentholders(this->currentLog->getAttachmentHolders()); + extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); + extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); + if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed @@ -123,19 +139,23 @@ // writeResponse will take another one from the collection response->set_logid(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); - if (!this->currentLog->getAttachmentHolders().empty()) { - this->state = State::LOG_ATTACHMENTS; - } else { - this->nextLog(); - } + this->nextLog(); return nullptr; } + } else { + extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size(); + extraBytesNeeded += this->currentLog->getLogID().size(); } + response->set_backupid(this->currentLog->getBackupID()); response->set_logid(this->currentLog->getLogID()); // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); + if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { + this->dataChunks->blockingRead(dataChunk); + } + this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); + dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); @@ -144,29 +164,15 @@ // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { - if (!this->currentLog->getAttachmentHolders().empty()) { - this->state = State::LOG_ATTACHMENTS; - } else { - this->nextLog(); - } + this->nextLog(); return nullptr; } else { + dataChunk = + this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_logchunk(dataChunk); } return nullptr; } - if (this->state == State::LOG_ATTACHMENTS) { - if (this->currentLog == nullptr) { - throw std::runtime_error( - "trying to send attachments of a non-existing log item"); - } - if (this->currentLog->getAttachmentHolders().empty()) { - throw std::runtime_error("trying to send empty attachments"); - } - response->set_attachmentholders(this->currentLog->getAttachmentHolders()); - this->nextLog(); - return nullptr; - } throw std::runtime_error("unhandled state"); } @@ -175,6 +181,7 @@ ++this->currentLogIndex; this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; + this->endOfQueue = false; } std::string PullBackupReactor::prepareDataChunkWithPadding(