diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,5 +1,7 @@ #include "PullBackupReactor.h" +#include "blob_client/src/lib.rs.h" + #include "DatabaseManager.h" namespace comm { @@ -17,7 +19,7 @@ throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } - // todo:blob perform get initialize + get_client_initialize_cxx(holder.c_str()); this->clientInitialized = true; } @@ -64,7 +66,11 @@ } std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit) { - // todo:blob perform blocking read + rust::Vec responseVec = get_client_blocking_read_cxx(); + dataChunk = (responseVec.empty()) + ? "" + : std::string(reinterpret_cast(responseVec.data())); + dataChunk.resize(responseVec.size()); } if (!dataChunk.empty() || this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) { @@ -134,7 +140,11 @@ // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { - // todo:blob perform blocking read + rust::Vec responseVec = get_client_blocking_read_cxx(); + dataChunk = (responseVec.empty()) + ? "" + : std::string(reinterpret_cast(responseVec.data())); + dataChunk.resize(responseVec.size()); } this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0); dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded); @@ -163,7 +173,10 @@ const std::string &dataChunk, size_t padding) { if (dataChunk.size() > this->chunkLimit) { - throw std::runtime_error("received data chunk bigger than the chunk limit"); + throw std::runtime_error(std::string( + "received data chunk bigger than the chunk limit: " + + std::to_string(dataChunk.size()) + "/" + + std::to_string(this->chunkLimit))); } std::string chunk = std::move(this->internalBuffer) + dataChunk; @@ -183,8 +196,7 @@ void PullBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - // todo:blob perform put:add chunk ("") - // todo:blob perform put:wait for completion + get_client_terminate_cxx(); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message());