Page MenuHomePhabricator

D4246.id13882.diff
No OneTemporary

D4246.id13882.diff

diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -25,7 +25,9 @@
enum class State {
COMPACTION = 1,
- LOGS = 2,
+ COMPACTION_ATTACHMENTS = 2,
+ LOGS = 3,
+ LOG_ATTACHMENTS = 4,
};
std::shared_ptr<database::BackupItem> backupItem;
@@ -39,6 +41,7 @@
std::shared_ptr<database::LogItem> currentLog;
void initializeGetReactor(const std::string &holder);
+ void nextLog();
public:
PullBackupReactor(const backup::PullBackupRequest *request);
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -54,6 +54,7 @@
// as there may be multiple threads from the pool taking over here
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
if (this->state == State::COMPACTION) {
+ response->set_backupid(this->backupItem->getBackupID());
if (this->getReactor == nullptr) {
this->initializeGetReactor(this->backupItem->getCompactionHolder());
}
@@ -71,19 +72,32 @@
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
}
+ if (!this->backupItem->getAttachmentHolders().empty()) {
+ this->state = State::COMPACTION_ATTACHMENTS;
+ } else {
+ this->state = State::LOGS;
+ }
+ return nullptr;
+ }
+ if (this->state == State::COMPACTION_ATTACHMENTS) {
+ if (this->backupItem->getAttachmentHolders().empty()) {
+ throw std::runtime_error("trying to send empty backup attachments");
+ }
+ response->set_attachmentholders(this->backupItem->getAttachmentHolders());
this->state = State::LOGS;
+ return nullptr;
}
if (this->state == State::LOGS) {
// TODO make sure logs are received in correct order regardless their size
if (this->logs.empty()) {
- // this means that there are no logs at all so we just terminate with the
- // compaction
+ // this means that there are no logs at all so we just terminate with
+ // the compaction
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex == this->logs.size()) {
- // we reached the end of the logs collection so we just want to terminate
- // either we terminate with an error if we have some dangling data
- // or with success if we don't
+ // we reached the end of the logs collection so we just want to
+ // terminate either we terminate with an error if we have some dangling
+ // data or with success if we don't
if (!this->dataChunks->isEmpty()) {
throw std::runtime_error("dangling data discovered after reading logs");
}
@@ -100,42 +114,69 @@
if (this->currentLog == nullptr) {
this->currentLog = this->logs.at(this->currentLogIndex);
if (this->currentLog->getPersistedInBlob()) {
- // if the item is stored in the blob, we initialize the get reactor and
- // proceed
+ // if the item is stored in the blob, we initialize the get reactor
+ // and proceed
this->initializeGetReactor(this->currentLog->getValue());
} else {
// if the item is persisted in the database, we just take it, send the
// data to the client and reset currentLog so the next invocation of
// writeResponse will take another one from the collection
+ response->set_logid(this->currentLog->getLogID());
response->set_logchunk(this->currentLog->getValue());
- ++this->currentLogIndex;
- this->currentLog = nullptr;
+ if (!this->currentLog->getAttachmentHolders().empty()) {
+ this->state = State::LOG_ATTACHMENTS;
+ } else {
+ this->nextLog();
+ }
return nullptr;
}
}
- // we want to read the chunks from the blob through the get client until we
- // get an empty chunk - a sign of "end of chunks"
+ response->set_logid(this->currentLog->getLogID());
+ // we want to read the chunks from the blob through the get client until
+ // we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
this->dataChunks->blockingRead(dataChunk);
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
}
- // if we get an empty chunk, we reset the currentLog so we can read the next
- // one from the logs collection.
+ // if we get an empty chunk, we reset the currentLog so we can read the
+ // next one from the logs collection.
// If there's data inside, we write it to the client and proceed.
if (dataChunk.empty()) {
- ++this->currentLogIndex;
- this->currentLog = nullptr;
+ if (!this->currentLog->getAttachmentHolders().empty()) {
+ this->state = State::LOG_ATTACHMENTS;
+ } else {
+ this->nextLog();
+ }
return nullptr;
} else {
response->set_logchunk(dataChunk);
}
return nullptr;
}
+ if (this->state == State::LOG_ATTACHMENTS) {
+ if (this->currentLog == nullptr) {
+ throw std::runtime_error(
+ "trying to send attachments of a non-existing log item");
+ }
+ if (this->currentLog->getAttachmentHolders().empty()) {
+ throw std::runtime_error("trying to send empty attachments");
+ }
+ response->set_attachmentholders(this->currentLog->getAttachmentHolders());
+ this->nextLog();
+ return nullptr;
+ }
+
throw std::runtime_error("unhandled state");
}
+void PullBackupReactor::nextLog() {
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ this->state = State::LOGS;
+}
+
void PullBackupReactor::terminateCallback() {
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 18, 12:29 PM (21 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2534925
Default Alt Text
D4246.id13882.diff (6 KB)

Event Timeline