diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -39,12 +39,19 @@ std::vector> logs; size_t currentLogIndex = 0; std::shared_ptr currentLog; + std::string internalBuffer; + std::string previousLogID; std::condition_variable blobGetDoneCV; std::mutex blobGetDoneCVMutex; + const size_t chunkLimit = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + void initializeGetReactor(const std::string &holder); void nextLog(); + std::string + prepareDataChunkWithPadding(const std::string &dataChunk, size_t padding); public: PullBackupReactor(const backup::PullBackupRequest *request); diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -59,8 +59,12 @@ this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!dataChunk.empty()) { + if (this->internalBuffer.size() < this->chunkLimit) { + this->dataChunks->blockingRead(dataChunk); + } + if (!dataChunk.empty() || this->internalBuffer.size() >= this->chunkLimit) { + dataChunk = + this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); response->set_compactionchunk(dataChunk); return nullptr; } @@ -135,7 +139,10 @@ // we want to read the chunks from the blob through the get client until // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); + if (this->internalBuffer.size() < this->chunkLimit) { + this->dataChunks->blockingRead(dataChunk); + } + dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getReactor->getStatusHolder()->getStatus().error_message()); @@ -173,8 +180,30 @@ void PullBackupReactor::nextLog() { ++this->currentLogIndex; + this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; - this->state = State::LOGS; +} + +std::string PullBackupReactor::prepareDataChunkWithPadding( + const std::string &dataChunk, + size_t padding) { + if (dataChunk.size() > this->chunkLimit) { + throw std::runtime_error("received data chunk bigger than the chunk limit"); + } + + std::string chunk = std::move(this->internalBuffer) + dataChunk; + const size_t realSize = chunk.size() + padding; + if (realSize <= this->chunkLimit) { + return chunk; + } + const size_t bytesToStash = realSize - this->chunkLimit; + this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end()); + chunk.resize(chunk.size() - bytesToStash); + if (chunk.size() > this->chunkLimit) { + throw std::runtime_error("new data chunk incorrectly calculated"); + } + + return chunk; } void PullBackupReactor::terminateCallback() {