Page MenuHomePhabricator

D4555.id14859.diff
No OneTemporary

D4555.id14859.diff

diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -41,6 +41,7 @@
std::shared_ptr<database::LogItem> currentLog;
std::string internalBuffer;
std::string previousLogID;
+ bool endOfQueue = false;
std::condition_variable blobGetDoneCV;
std::mutex blobGetDoneCVMutex;
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -53,14 +53,28 @@
// we make sure that the blob client's state is flushed to the main memory
// as there may be multiple threads from the pool taking over here
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
+ response->set_attachmentholders("");
+ response->set_backupid("");
+ size_t extraBytesNeeded = 0;
if (this->state == State::COMPACTION) {
response->set_backupid(this->backupItem->getBackupID());
+ extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size();
+ extraBytesNeeded += this->backupItem->getBackupID().size();
+
if (this->getReactor == nullptr) {
+ extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size();
+ extraBytesNeeded += this->backupItem->getAttachmentHolders().size();
+ response->set_attachmentholders(this->backupItem->getAttachmentHolders());
this->initializeGetReactor(this->backupItem->getCompactionHolder());
}
std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!dataChunk.empty()) {
+ if (this->internalBuffer.size() < this->chunkLimit) {
+ this->dataChunks->blockingRead(dataChunk);
+ }
+ if (!dataChunk.empty() ||
+ this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) {
+ dataChunk =
+ this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
response->set_compactionchunk(dataChunk);
return nullptr;
}
@@ -72,20 +86,11 @@
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
}
- if (!this->backupItem->getAttachmentHolders().empty()) {
- this->state = State::COMPACTION_ATTACHMENTS;
- } else {
- this->state = State::LOGS;
- }
- return nullptr;
- }
- if (this->state == State::COMPACTION_ATTACHMENTS) {
- if (this->backupItem->getAttachmentHolders().empty()) {
- throw std::runtime_error("trying to send empty backup attachments");
- }
- response->set_attachmentholders(this->backupItem->getAttachmentHolders());
this->state = State::LOGS;
- return nullptr;
+ if (!this->internalBuffer.empty()) {
+ response->set_compactionchunk(std::move(this->internalBuffer));
+ return nullptr;
+ }
}
if (this->state == State::LOGS) {
// TODO make sure logs are received in correct order regardless their size
@@ -101,6 +106,11 @@
if (!this->dataChunks->isEmpty()) {
throw std::runtime_error("dangling data discovered after reading logs");
}
+ if (!this->internalBuffer.empty()) {
+ response->set_logid(this->previousLogID);
+ response->set_logchunk(std::move(this->internalBuffer));
+ return nullptr;
+ }
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex > this->logs.size()) {
@@ -113,6 +123,13 @@
// it is only not null when we read data in chunks
if (this->currentLog == nullptr) {
this->currentLog = this->logs.at(this->currentLogIndex);
+ extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size();
+ extraBytesNeeded += this->currentLog->getLogID().size();
+
+ response->set_attachmentholders(this->currentLog->getAttachmentHolders());
+ extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size();
+ extraBytesNeeded += this->currentLog->getAttachmentHolders().size();
+
if (this->currentLog->getPersistedInBlob()) {
// if the item is stored in the blob, we initialize the get reactor
// and proceed
@@ -123,19 +140,23 @@
// writeResponse will take another one from the collection
response->set_logid(this->currentLog->getLogID());
response->set_logchunk(this->currentLog->getValue());
- if (!this->currentLog->getAttachmentHolders().empty()) {
- this->state = State::LOG_ATTACHMENTS;
- } else {
- this->nextLog();
- }
+ this->nextLog();
return nullptr;
}
+ } else {
+ extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size();
+ extraBytesNeeded += this->currentLog->getLogID().size();
}
+ response->set_backupid(this->currentLog->getBackupID());
response->set_logid(this->currentLog->getLogID());
// we want to read the chunks from the blob through the get client until
// we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
+ if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) {
+ this->dataChunks->blockingRead(dataChunk);
+ }
+ this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0);
+ dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
@@ -144,29 +165,15 @@
// next one from the logs collection.
// If there's data inside, we write it to the client and proceed.
if (dataChunk.empty()) {
- if (!this->currentLog->getAttachmentHolders().empty()) {
- this->state = State::LOG_ATTACHMENTS;
- } else {
- this->nextLog();
- }
+ this->nextLog();
return nullptr;
} else {
+ dataChunk =
+ this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
response->set_logchunk(dataChunk);
}
return nullptr;
}
- if (this->state == State::LOG_ATTACHMENTS) {
- if (this->currentLog == nullptr) {
- throw std::runtime_error(
- "trying to send attachments of a non-existing log item");
- }
- if (this->currentLog->getAttachmentHolders().empty()) {
- throw std::runtime_error("trying to send empty attachments");
- }
- response->set_attachmentholders(this->currentLog->getAttachmentHolders());
- this->nextLog();
- return nullptr;
- }
throw std::runtime_error("unhandled state");
}
@@ -175,6 +182,7 @@
++this->currentLogIndex;
this->previousLogID = this->currentLog->getLogID();
this->currentLog = nullptr;
+ this->endOfQueue = false;
}
std::string PullBackupReactor::prepareDataChunkWithPadding(

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 3, 3:32 AM (21 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2610068
Default Alt Text
D4555.id14859.diff (6 KB)

Event Timeline