diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -19,12 +19,14 @@ class BlobGetClientReactor : public ClientReadReactorBase { std::string holder; + const size_t extraBytesNeeded; std::shared_ptr> dataChunks; std::condition_variable *terminationNotifier; public: BlobGetClientReactor( const std::string &holder, + const size_t extraBytesNeeded, std::shared_ptr> dataChunks, std::condition_variable *terminationNotifier); diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -7,11 +7,15 @@ BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, + const size_t extraBytesNeeded, std::shared_ptr> dataChunks, std::condition_variable *terminationNotifier) : holder(holder), + extraBytesNeeded(extraBytesNeeded), dataChunks(dataChunks), terminationNotifier(terminationNotifier) { + this->request.set_holder(holder); + this->request.set_extrabytesneeded(extraBytesNeeded); } std::unique_ptr diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -25,9 +25,7 @@ enum class State { COMPACTION = 1, - COMPACTION_ATTACHMENTS = 2, LOGS = 3, - LOG_ATTACHMENTS = 4, }; std::shared_ptr backupItem; @@ -43,7 +41,7 @@ std::condition_variable blobGetDoneCV; std::mutex blobGetDoneCVMutex; - void initializeGetReactor(const std::string &holder); + void initializeGetReactor(const std::string &holder, const size_t extraBytesNeeded); void nextLog(); public: diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,6 +1,8 @@ #include "PullBackupReactor.h" +#include "BackupItem.h" #include "DatabaseManager.h" +#include "LogItem.h" #include @@ -15,14 +17,15 @@ dataChunks(std::make_shared>(100)) { } -void PullBackupReactor::initializeGetReactor(const std::string &holder) { +void PullBackupReactor::initializeGetReactor( + const std::string &holder, + const size_t extraBytesNeeded) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } this->getReactor.reset(new reactor::BlobGetClientReactor( - holder, this->dataChunks, &this->blobGetDoneCV)); - this->getReactor->request.set_holder(holder); + holder, extraBytesNeeded, this->dataChunks, &this->blobGetDoneCV)); this->blobClient.get(this->getReactor); } @@ -53,10 +56,17 @@ // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); + response->set_attachmentholders(""); if (this->state == State::COMPACTION) { response->set_backupid(this->backupItem->getBackupID()); if (this->getReactor == nullptr) { - this->initializeGetReactor(this->backupItem->getCompactionHolder()); + size_t extraBytesNeeded = database::BackupItem::FIELD_BACKUP_ID.size(); + extraBytesNeeded += this->backupItem->getBackupID().size(); + extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size(); + extraBytesNeeded += this->backupItem->getAttachmentHolders().size(); + response->set_attachmentholders(this->backupItem->getAttachmentHolders()); + this->initializeGetReactor( + this->backupItem->getCompactionHolder(), extraBytesNeeded); } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); @@ -72,18 +82,6 @@ throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); } - if (!this->backupItem->getAttachmentHolders().empty()) { - this->state = State::COMPACTION_ATTACHMENTS; - } else { - this->state = State::LOGS; - } - return nullptr; - } - if (this->state == State::COMPACTION_ATTACHMENTS) { - if (this->backupItem->getAttachmentHolders().empty()) { - throw std::runtime_error("trying to send empty backup attachments"); - } - response->set_attachmentholders(this->backupItem->getAttachmentHolders()); this->state = State::LOGS; return nullptr; } @@ -113,21 +111,23 @@ // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); + response->set_attachmentholders(this->currentLog->getAttachmentHolders()); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor // and proceed - this->initializeGetReactor(this->currentLog->getValue()); + size_t extraBytesNeeded = database::LogItem::FIELD_LOG_ID.size(); + extraBytesNeeded += this->currentLog->getLogID().size(); + extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size(); + extraBytesNeeded += this->currentLog->getAttachmentHolders().size(); + this->initializeGetReactor( + this->currentLog->getValue(), extraBytesNeeded); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logid(this->currentLog->getLogID()); response->set_logchunk(this->currentLog->getValue()); - if (!this->currentLog->getAttachmentHolders().empty()) { - this->state = State::LOG_ATTACHMENTS; - } else { - this->nextLog(); - } + this->nextLog(); return nullptr; } } @@ -144,29 +144,13 @@ // next one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { - if (!this->currentLog->getAttachmentHolders().empty()) { - this->state = State::LOG_ATTACHMENTS; - } else { - this->nextLog(); - } + this->nextLog(); return nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } - if (this->state == State::LOG_ATTACHMENTS) { - if (this->currentLog == nullptr) { - throw std::runtime_error( - "trying to send attachments of a non-existing log item"); - } - if (this->currentLog->getAttachmentHolders().empty()) { - throw std::runtime_error("trying to send empty attachments"); - } - response->set_attachmentholders(this->currentLog->getAttachmentHolders()); - this->nextLog(); - return nullptr; - } throw std::runtime_error("unhandled state"); } @@ -174,7 +158,6 @@ void PullBackupReactor::nextLog() { ++this->currentLogIndex; this->currentLog = nullptr; - this->state = State::LOGS; } void PullBackupReactor::terminateCallback() {