Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3502471
D4246.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
D4246.diff
View Options
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -25,7 +25,9 @@
enum class State {
COMPACTION = 1,
- LOGS = 2,
+ COMPACTION_ATTACHMENTS = 2,
+ LOGS = 3,
+ LOG_ATTACHMENTS = 4,
};
std::shared_ptr<database::BackupItem> backupItem;
@@ -39,6 +41,7 @@
std::shared_ptr<database::LogItem> currentLog;
void initializeGetReactor(const std::string &holder);
+ void nextLog();
public:
PullBackupReactor(const backup::PullBackupRequest *request);
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -54,6 +54,7 @@
// as there may be multiple threads from the pool taking over here
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
if (this->state == State::COMPACTION) {
+ response->set_backupid(this->backupItem->getBackupID());
if (this->getReactor == nullptr) {
this->initializeGetReactor(this->backupItem->getCompactionHolder());
}
@@ -71,19 +72,32 @@
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
}
+ if (!this->backupItem->getAttachmentHolders().empty()) {
+ this->state = State::COMPACTION_ATTACHMENTS;
+ } else {
+ this->state = State::LOGS;
+ }
+ return nullptr;
+ }
+ if (this->state == State::COMPACTION_ATTACHMENTS) {
+ if (this->backupItem->getAttachmentHolders().empty()) {
+ throw std::runtime_error("trying to send empty backup attachments");
+ }
+ response->set_attachmentholders(this->backupItem->getAttachmentHolders());
this->state = State::LOGS;
+ return nullptr;
}
if (this->state == State::LOGS) {
// TODO make sure logs are received in correct order regardless their size
if (this->logs.empty()) {
- // this means that there are no logs at all so we just terminate with the
- // compaction
+ // this means that there are no logs at all so we just terminate with
+ // the compaction
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
if (this->currentLogIndex == this->logs.size()) {
- // we reached the end of the logs collection so we just want to terminate
- // either we terminate with an error if we have some dangling data
- // or with success if we don't
+ // we reached the end of the logs collection so we just want to
+ // terminate either we terminate with an error if we have some dangling
+ // data or with success if we don't
if (!this->dataChunks->isEmpty()) {
throw std::runtime_error("dangling data discovered after reading logs");
}
@@ -100,42 +114,69 @@
if (this->currentLog == nullptr) {
this->currentLog = this->logs.at(this->currentLogIndex);
if (this->currentLog->getPersistedInBlob()) {
- // if the item is stored in the blob, we initialize the get reactor and
- // proceed
+ // if the item is stored in the blob, we initialize the get reactor
+ // and proceed
this->initializeGetReactor(this->currentLog->getValue());
} else {
// if the item is persisted in the database, we just take it, send the
// data to the client and reset currentLog so the next invocation of
// writeResponse will take another one from the collection
+ response->set_logid(this->currentLog->getLogID());
response->set_logchunk(this->currentLog->getValue());
- ++this->currentLogIndex;
- this->currentLog = nullptr;
+ if (!this->currentLog->getAttachmentHolders().empty()) {
+ this->state = State::LOG_ATTACHMENTS;
+ } else {
+ this->nextLog();
+ }
return nullptr;
}
}
- // we want to read the chunks from the blob through the get client until we
- // get an empty chunk - a sign of "end of chunks"
+ response->set_logid(this->currentLog->getLogID());
+ // we want to read the chunks from the blob through the get client until
+ // we get an empty chunk - a sign of "end of chunks"
std::string dataChunk;
this->dataChunks->blockingRead(dataChunk);
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->getReactor->getStatusHolder()->getStatus().error_message());
}
- // if we get an empty chunk, we reset the currentLog so we can read the next
- // one from the logs collection.
+ // if we get an empty chunk, we reset the currentLog so we can read the
+ // next one from the logs collection.
// If there's data inside, we write it to the client and proceed.
if (dataChunk.empty()) {
- ++this->currentLogIndex;
- this->currentLog = nullptr;
+ if (!this->currentLog->getAttachmentHolders().empty()) {
+ this->state = State::LOG_ATTACHMENTS;
+ } else {
+ this->nextLog();
+ }
return nullptr;
} else {
response->set_logchunk(dataChunk);
}
return nullptr;
}
+ if (this->state == State::LOG_ATTACHMENTS) {
+ if (this->currentLog == nullptr) {
+ throw std::runtime_error(
+ "trying to send attachments of a non-existing log item");
+ }
+ if (this->currentLog->getAttachmentHolders().empty()) {
+ throw std::runtime_error("trying to send empty attachments");
+ }
+ response->set_attachmentholders(this->currentLog->getAttachmentHolders());
+ this->nextLog();
+ return nullptr;
+ }
+
throw std::runtime_error("unhandled state");
}
+void PullBackupReactor::nextLog() {
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ this->state = State::LOGS;
+}
+
void PullBackupReactor::terminateCallback() {
if (!this->getReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 21, 4:16 AM (9 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2685855
Default Alt Text
D4246.diff (6 KB)
Attached To
Mode
D4246: [services] Backup - Send attachments from pull backup reactor
Attached
Detach File
Event Timeline
Log In to Comment