Page MenuHomePhabricator

D3636.id11692.diff
No OneTemporary

D3636.id11692.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h
@@ -1,6 +1,7 @@
#pragma once
#include "BlobGetClientReactor.h"
+#include "DatabaseEntitiesTools.h"
#include "DatabaseManager.h"
#include "ServerWriteReactorBase.h"
#include "ServiceBlobClient.h"
@@ -13,6 +14,7 @@
#include <iostream>
#include <memory>
#include <string>
+#include <vector>
namespace comm {
namespace network {
@@ -33,6 +35,9 @@
std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
ServiceBlobClient blobClient;
State state = State::COMPACTION;
+ std::vector<std::shared_ptr<database::LogItem>> logs;
+ size_t currentLogIndex = 0;
+ std::shared_ptr<database::LogItem> currentLog;
void initializeGetReactor(const std::string &holder);
@@ -83,7 +88,8 @@
this->request.userid() + "], backup id [" + this->request.backupid() +
"]");
}
- // TODO get logs
+ this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup(
+ this->request.backupid());
}
std::unique_ptr<grpc::Status>
@@ -91,23 +97,52 @@
// we make sure that the blob client's state is flushed to the main memory
// as there may be multiple threads from the pool taking over here
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- switch (this->state) {
- case State::COMPACTION: {
- this->initializeGetReactor(this->backupItem->getCompactionHolder());
- std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (dataChunk.empty()) {
- // TODO try to immediately start writing logs instead of wasting a cycle
- // sending nothing
- this->state = State::LOGS;
- return nullptr;
- }
+ if (this->state == State::COMPACTION) {
+ this->initializeGetReactor(this->backupItem->getCompactionHolder());
+ std::string dataChunk;
+ this->dataChunks->blockingRead(dataChunk);
+ if (!dataChunk.empty()) {
response->set_compactionchunk(dataChunk);
return nullptr;
}
- case State::LOGS: {
- throw std::runtime_error("unimplemented");
+ if (!this->dataChunks->isEmpty()) {
+ throw std::runtime_error(
+ "dangling data discovered after reading compaction");
+ }
+ this->getReactor = nullptr;
+ this->state = State::LOGS;
+ }
+ if (this->state == State::LOGS) {
+ // TODO make sure logs are received in correct order regardless their size
+ if (this->logs.empty()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ if (this->currentLogIndex == this->logs.size()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ if (this->currentLogIndex > this->logs.size()) {
+ throw std::runtime_error("log index out of bound");
+ }
+ if (this->currentLog == nullptr) {
+ this->currentLog = this->logs.at(this->currentLogIndex);
+ if (this->currentLog->getPersistedInBlob()) {
+ this->initializeGetReactor(this->currentLog->getValue());
+ } else {
+ response->set_logchunk(this->currentLog->getValue());
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ return nullptr;
+ }
+ }
+ std::string dataChunk;
+ this->dataChunks->blockingRead(dataChunk);
+ if (dataChunk.empty()) {
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ } else {
+ response->set_logchunk(dataChunk);
}
+ return nullptr;
}
throw std::runtime_error("unhandled state");
}

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 6:20 AM (20 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2592821
Default Alt Text
D3636.id11692.diff (3 KB)

Event Timeline