Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3399523
D4555.id14859.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
D4555.id14859.diff
View Options
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -41,6 +41,7 @@
std::shared_ptr<database::LogItem> currentLog;
std::string internalBuffer;
std::string previousLogID;
+ bool endOfQueue = false;
std::condition_variable blobGetDoneCV;
std::mutex blobGetDoneCVMutex;
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -53,14 +53,28 @@
// we make sure that the blob client's state is flushed to the main memory
// as there may be multiple threads from the pool taking over here
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
+ response->set_attachmentholders("");
+ response->set_backupid("");
+ size_t extraBytesNeeded = 0;
if (this->state == State::COMPACTION) {
response->set_backupid(this->backupItem->getBackupID());
+ extraBytesNeeded += database::BackupItem::FIELD_BACKUP_ID.size();
+ extraBytesNeeded += this->backupItem->getBackupID().size();
+
if (this->getReactor == nullptr) {
+ extraBytesNeeded += database::BackupItem::FIELD_ATTACHMENT_HOLDERS.size();
+ extraBytesNeeded += this->backupItem->getAttachmentHolders().size();
+ response->set_attachmentholders(this->backupItem->getAttachmentHolders());
this->initializeGetReactor(this->backupItem->getCompactionHolder());
}
std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!dataChunk.empty()) {
+ if (this->internalBuffer.size() < this->chunkLimit) {
+ this->dataChunks->blockingRead(dataChunk);
+ }
+ if (!dataChunk.empty() ||
+ this->internalBuffer.size() + extraBytesNeeded >= this->chunkLimit) {
+ dataChunk =
+ this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
response->set_compactionchunk(dataChunk);
return nullptr;
}
@@ -72,20 +86,11 @@
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
}
- if (!this->backupItem->getAttachmentHolders().empty()) {
- this->state = State::COMPACTION_ATTACHMENTS;
- } else {
- this->state = State::LOGS;
- }
- return nullptr;
- }
- if (this->state == State::COMPACTION_ATTACHMENTS) {
- if (this->backupItem->getAttachmentHolders().empty()) {
- throw std::runtime_error("trying to send empty backup attachments");
- }
- response->set_attachmentholders(this->backupItem->getAttachmentHolders());
this->state = State::LOGS;
- return nullptr;
+ if (!this->internalBuffer.empty()) {
+ response->set_compactionchunk(std::move(this->internalBuffer));
+ return nullptr;
+ }
}
if (this->state == State::LOGS) {
// TODO make sure logs are received in correct order regardless their size
@@ -101,6 +106,11 @@
if (!this->dataChunks->isEmpty()) {
throw std::runtime_error("dangling data discovered after reading logs");
}
+ if (!this->internalBuffer.empty()) {
+ response->set_logid(this->previousLogID);
+ response->set_logchunk(std::move(this->internalBuffer));
+ return nullptr;
+ }
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex > this->logs.size()) {
@@ -113,6 +123,13 @@
// it is only not null when we read data in chunks
if (this->currentLog == nullptr) {
this->currentLog = this->logs.at(this->currentLogIndex);
+ extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size();
+ extraBytesNeeded += this->currentLog->getLogID().size();
+
+ response->set_attachmentholders(this->currentLog->getAttachmentHolders());
+ extraBytesNeeded += database::LogItem::FIELD_ATTACHMENT_HOLDERS.size();
+ extraBytesNeeded += this->currentLog->getAttachmentHolders().size();
+
if (this->currentLog->getPersistedInBlob()) {
// if the item is stored in the blob, we initialize the get reactor
// and proceed
@@ -123,19 +140,23 @@
// writeResponse will take another one from the collection
response->set_logid(this->currentLog->getLogID());
response->set_logchunk(this->currentLog->getValue());
- if (!this->currentLog->getAttachmentHolders().empty()) {
- this->state = State::LOG_ATTACHMENTS;
- } else {
- this->nextLog();
- }
+ this->nextLog();
return nullptr;
}
+ } else {
+ extraBytesNeeded += database::LogItem::FIELD_LOG_ID.size();
+ extraBytesNeeded += this->currentLog->getLogID().size();
}
+ response->set_backupid(this->currentLog->getBackupID());
response->set_logid(this->currentLog->getLogID());
// we want to read the chunks from the blob through the get client until
// we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
+ if (this->internalBuffer.size() < this->chunkLimit && !this->endOfQueue) {
+ this->dataChunks->blockingRead(dataChunk);
+ }
+ this->endOfQueue = this->endOfQueue || (dataChunk.size() == 0);
+ dataChunk = this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
@@ -144,29 +165,15 @@
// next one from the logs collection.
// If there's data inside, we write it to the client and proceed.
if (dataChunk.empty()) {
- if (!this->currentLog->getAttachmentHolders().empty()) {
- this->state = State::LOG_ATTACHMENTS;
- } else {
- this->nextLog();
- }
+ this->nextLog();
return nullptr;
} else {
+ dataChunk =
+ this->prepareDataChunkWithPadding(dataChunk, extraBytesNeeded);
response->set_logchunk(dataChunk);
}
return nullptr;
}
- if (this->state == State::LOG_ATTACHMENTS) {
- if (this->currentLog == nullptr) {
- throw std::runtime_error(
- "trying to send attachments of a non-existing log item");
- }
- if (this->currentLog->getAttachmentHolders().empty()) {
- throw std::runtime_error("trying to send empty attachments");
- }
- response->set_attachmentholders(this->currentLog->getAttachmentHolders());
- this->nextLog();
- return nullptr;
- }
throw std::runtime_error("unhandled state");
}
@@ -175,6 +182,7 @@
++this->currentLogIndex;
this->previousLogID = this->currentLog->getLogID();
this->currentLog = nullptr;
+ this->endOfQueue = false;
}
std::string PullBackupReactor::prepareDataChunkWithPadding(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 3, 3:32 AM (21 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2610068
Default Alt Text
D4555.id14859.diff (6 KB)
Attached To
Mode
D4555: [services] Backup - pull backup - change logic to use att holders out of data section
Attached
Detach File
Event Timeline
Log In to Comment