diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -39,12 +39,19 @@ std::vector> logs; size_t currentLogIndex = 0; std::shared_ptr currentLog; + std::string internalBuffer; + std::string previousLogID; std::condition_variable blobGetDoneCV; std::mutex blobGetDoneCVMutex; + const size_t chunkLimit = + GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; + void initializeGetReactor(const std::string &holder); void nextLog(); + std::string + prepareDataChunkWithPadding(const std::string &dataChunk, size_t padding); public: PullBackupReactor(const backup::PullBackupRequest *request); diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -173,8 +173,30 @@ void PullBackupReactor::nextLog() { ++this->currentLogIndex; + this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; - this->state = State::LOGS; +} + +std::string PullBackupReactor::prepareDataChunkWithPadding( + const std::string &dataChunk, + size_t padding) { + if (dataChunk.size() > this->chunkLimit) { + throw std::runtime_error("received data chunk bigger than the chunk limit"); + } + + std::string chunk = std::move(this->internalBuffer) + dataChunk; + const size_t realSize = chunk.size() + padding; + if (realSize <= this->chunkLimit) { + return chunk; + } + const size_t bytesToStash = realSize - this->chunkLimit; + this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end()); + chunk.resize(chunk.size() - bytesToStash); + if (chunk.size() > this->chunkLimit) { + throw std::runtime_error("new data chunk incorrectly calculated"); + } + + return chunk; } void PullBackupReactor::terminateCallback() {