Page MenuHomePhorge

D4571.1765232398.diff
No OneTemporary

Size
3 KB
Referenced Files
None
Subscribers
None

D4571.1765232398.diff

diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -39,12 +39,19 @@
std::vector<std::shared_ptr<database::LogItem>> logs;
size_t currentLogIndex = 0;
std::shared_ptr<database::LogItem> currentLog;
+ std::string internalBuffer;
+ std::string previousLogID;
std::condition_variable blobGetDoneCV;
std::mutex blobGetDoneCVMutex;
+ const size_t chunkLimit =
+ GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
+
void initializeGetReactor(const std::string &holder);
void nextLog();
+ std::string
+ prepareDataChunkWithPadding(const std::string &dataChunk, size_t padding);
public:
PullBackupReactor(const backup::PullBackupRequest *request);
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -59,8 +59,12 @@
this->initializeGetReactor(this->backupItem->getCompactionHolder());
}
std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!dataChunk.empty()) {
+ if (this->internalBuffer.size() < this->chunkLimit) {
+ this->dataChunks->blockingRead(dataChunk);
+ }
+ if (!dataChunk.empty() || this->internalBuffer.size() >= this->chunkLimit) {
+ dataChunk =
+ this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
response->set_compactionchunk(dataChunk);
return nullptr;
}
@@ -135,7 +139,10 @@
// we want to read the chunks from the blob through the get client until
// we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
+ if (this->internalBuffer.size() < this->chunkLimit) {
+ this->dataChunks->blockingRead(dataChunk);
+ }
+ dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
@@ -173,8 +180,30 @@
void PullBackupReactor::nextLog() {
++this->currentLogIndex;
+ this->previousLogID = this->currentLog->getLogID();
this->currentLog = nullptr;
- this->state = State::LOGS;
+}
+
+std::string PullBackupReactor::prepareDataChunkWithPadding(
+ const std::string &dataChunk,
+ size_t padding) {
+ if (dataChunk.size() > this->chunkLimit) {
+ throw std::runtime_error("received data chunk bigger than the chunk limit");
+ }
+
+ std::string chunk = std::move(this->internalBuffer) + dataChunk;
+ const size_t realSize = chunk.size() + padding;
+ if (realSize <= this->chunkLimit) {
+ return chunk;
+ }
+ const size_t bytesToStash = realSize - this->chunkLimit;
+ this->internalBuffer = std::string(chunk.end() - bytesToStash, chunk.end());
+ chunk.resize(chunk.size() - bytesToStash);
+ if (chunk.size() > this->chunkLimit) {
+ throw std::runtime_error("new data chunk incorrectly calculated");
+ }
+
+ return chunk;
}
void PullBackupReactor::terminateCallback() {

File Metadata

Mime Type
text/plain
Expires
Mon, Dec 8, 10:19 PM (4 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5851141
Default Alt Text
D4571.1765232398.diff (3 KB)

Event Timeline