diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -41,6 +41,7 @@ std::string previousLogID; bool endOfQueue = false; bool clientInitialized = false; + std::unique_ptr currentLogHolder; const size_t chunkLimit = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -66,7 +66,8 @@ } std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit) { - rust::Vec responseVec = get_client_blocking_read_cxx(); + rust::Vec responseVec = get_client_blocking_read_cxx( + this->backupItem->getCompactionHolder().c_str()); dataChunk = (responseVec.empty()) ? "" : std::string(reinterpret_cast(responseVec.data())); @@ -121,6 +122,8 @@ // if the item is stored in the blob, we initialize the get reactor // and proceed this->initializeGetReactor(this->currentLog->getValue()); + this->currentLogHolder = + std::make_unique(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of @@ -140,7 +143,8 @@ // we get an empty chunk - a sign of "end of chunks" std::string dataChunk; if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) { - rust::Vec responseVec = get_client_blocking_read_cxx(); + rust::Vec responseVec = + get_client_blocking_read_cxx(this->currentLogHolder->c_str()); dataChunk = (responseVec.empty()) ? "" : std::string(reinterpret_cast(responseVec.data())); @@ -167,6 +171,10 @@ this->previousLogID = this->currentLog->getLogID(); this->currentLog = nullptr; this->endOfQueue = false; + if (this->currentLogHolder != nullptr) { + get_client_terminate_cxx(this->currentLogHolder->c_str()); + this->currentLogHolder = nullptr; + } } std::string PullBackupReactor::prepareDataChunkWithPadding( @@ -196,7 +204,7 @@ void PullBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - get_client_terminate_cxx(); + get_client_terminate_cxx(this->backupItem->getCompactionHolder().c_str()); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message());