Page MenuHomePhabricator

D3636.id11416.diff
No OneTemporary

D3636.id11416.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
@@ -1,6 +1,7 @@
#pragma once
#include "BlobGetClientReactor.h"
+#include "DatabaseEntitiesTools.h"
#include "DatabaseManager.h"
#include "ServerWriteReactorBase.h"
#include "ServiceBlobClient.h"
@@ -13,6 +14,7 @@
#include <iostream>
#include <memory>
#include <string>
+#include <vector>
namespace comm {
namespace network {
@@ -33,6 +35,9 @@
std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
ServiceBlobClient blobClient;
State state = State::COMPACTION;
+ std::vector<std::shared_ptr<database::LogItem>> logs;
+ size_t currentLogIndex = 0;
+ std::shared_ptr<database::LogItem> currentLog;
void initializeGetReactor(const std::string &holder);
@@ -83,7 +88,8 @@
this->request.userid() + "], backup id [" + this->request.backupid() +
"]");
}
- // TODO get logs
+ this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup(
+ this->request.backupid());
}
std::unique_ptr<grpc::Status>
@@ -96,17 +102,51 @@
this->initializeGetReactor(this->backupItem->getCompactionHolder());
std::string dataChunk;
this->dataChunks->blockingRead(dataChunk);
- if (dataChunk.empty()) {
- // TODO try to immediately start writing logs instead of wasting a cycle
- // sending nothing
- this->state = State::LOGS;
+ if (!dataChunk.empty()) {
+ response->set_compactionchunk(dataChunk);
return nullptr;
+ } else {
+ if (!this->dataChunks->isEmpty()) {
+ throw std::runtime_error(
+ "dangling data discovered after reading compaction");
+ }
+ this->getReactor = nullptr;
+ this->state = State::LOGS;
+ // WARNING: intentionally letting the flow enter case State::LOGS from
+ // here, because we want to start sending logs right away instead of
+ // going one more cycle
}
- response->set_compactionchunk(dataChunk);
- return nullptr;
}
case State::LOGS: {
- throw std::runtime_error("unimplemented");
+ // TODO make sure logs are received in correct order regardless their size
+ if (this->logs.empty()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ if (this->currentLogIndex == this->logs.size()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ } else if (this->currentLogIndex > this->logs.size()) {
+ throw std::runtime_error("log index out of bound");
+ }
+ if (this->currentLog == nullptr) {
+ this->currentLog = this->logs.at(this->currentLogIndex);
+ if (this->currentLog->getPersistedInBlob()) {
+ this->initializeGetReactor(this->currentLog->getValue());
+ } else {
+ response->set_logchunk(this->currentLog->getValue());
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ return nullptr;
+ }
+ }
+ std::string dataChunk;
+ this->dataChunks->blockingRead(dataChunk);
+ if (dataChunk.empty()) {
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ } else {
+ response->set_logchunk(dataChunk);
+ }
+ return nullptr;
}
}
throw std::runtime_error("unhandled state");

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 6:25 AM (20 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2593055
Default Alt Text
D3636.id11416.diff (3 KB)

Event Timeline