Page MenuHomePhabricator

D3780.id11608.diff
No OneTemporary

D3780.id11608.diff

diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt
--- a/services/backup/CMakeLists.txt
+++ b/services/backup/CMakeLists.txt
@@ -36,7 +36,12 @@
set(DEV_HEADERS_PATH "./dev")
endif()
-file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp")
+file(GLOB SOURCE_CODE
+ "./src/*.cpp"
+ "./src/**/*.cpp"
+ "./src/**/**/*.cpp"
+ "./src/**/**/**/*.cpp"
+)
list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$")
foreach (ITEM ${DEV_SOURCE_CODE})
diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h
@@ -7,6 +7,7 @@
#include <memory>
#include <string>
+#include <condition_variable>
namespace comm {
namespace network {
@@ -25,25 +26,10 @@
BlobAppendHolderClientReactor(
const std::string &holder,
const std::string &hash,
- std::condition_variable *terminationNotifier)
- : terminationNotifier(terminationNotifier) {
- this->request.set_holder(holder);
- this->request.set_blobhash(hash);
- }
-
- void OnDone(const grpc::Status &status) {
- this->status = status;
- this->done = true;
- this->terminationNotifier->notify_one();
- }
-
- bool isDone() const {
- return this->done;
- }
-
- grpc::Status getStatus() const {
- return this->status;
- }
+ std::condition_variable *terminationNotifier);
+ void OnDone(const grpc::Status &status);
+ bool isDone() const;
+ grpc::Status getStatus() const;
};
} // namespace reactor
diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp
new file mode 100644
--- /dev/null
+++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp
@@ -0,0 +1,32 @@
+#include "BlobAppendHolderClientReactor.h"
+
+namespace comm {
+namespace network {
+namespace reactor {
+
+BlobAppendHolderClientReactor::BlobAppendHolderClientReactor(
+ const std::string &holder,
+ const std::string &hash,
+ std::condition_variable *terminationNotifier)
+ : terminationNotifier(terminationNotifier) {
+ this->request.set_holder(holder);
+ this->request.set_blobhash(hash);
+}
+
+void BlobAppendHolderClientReactor::OnDone(const grpc::Status &status) {
+ this->status = status;
+ this->done = true;
+ this->terminationNotifier->notify_one();
+}
+
+bool BlobAppendHolderClientReactor::isDone() const {
+ return this->done;
+}
+
+grpc::Status BlobAppendHolderClientReactor::getStatus() const {
+ return this->status;
+}
+
+} // namespace reactor
+} // namespace network
+} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
@@ -1,10 +1,10 @@
#pragma once
-#include "ClientReadReactorBase.h"
-
#include "../_generated/blob.grpc.pb.h"
#include "../_generated/blob.pb.h"
+#include "ClientReadReactorBase.h"
+
#include <folly/MPMCQueue.h>
#include <grpcpp/grpcpp.h>
@@ -31,28 +31,6 @@
grpc::Status getStatus() const;
};
-BlobGetClientReactor::BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks)
- : holder(holder), dataChunks(dataChunks) {
-}
-
-std::unique_ptr<grpc::Status>
-BlobGetClientReactor::readResponse(blob::GetResponse &response) {
- if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) {
- throw std::runtime_error("error reading data from the blob service");
- }
- return nullptr;
-}
-
-void BlobGetClientReactor::doneCallback() {
- this->dataChunks->write("");
-}
-
-grpc::Status BlobGetClientReactor::getStatus() const {
- return this->status;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
copy from services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
copy to services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
@@ -1,36 +1,10 @@
-#pragma once
-#include "ClientReadReactorBase.h"
-
-#include "../_generated/blob.grpc.pb.h"
-#include "../_generated/blob.pb.h"
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-#include <string>
+#include "BlobGetClientReactor.h"
namespace comm {
namespace network {
namespace reactor {
-class BlobGetClientReactor
- : public ClientReadReactorBase<blob::GetRequest, blob::GetResponse> {
- std::string holder;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
-
-public:
- BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks);
-
- std::unique_ptr<grpc::Status>
- readResponse(blob::GetResponse &response) override;
- void doneCallback() override;
- grpc::Status getStatus() const;
-};
-
BlobGetClientReactor::BlobGetClientReactor(
const std::string &holder,
std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks)
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -1,16 +1,16 @@
#pragma once
-#include "ClientBidiReactorBase.h"
#include "Constants.h"
#include "../_generated/blob.grpc.pb.h"
#include "../_generated/blob.pb.h"
+#include "ClientBidiReactorBase.h"
+
#include <folly/MPMCQueue.h>
#include <grpcpp/grpcpp.h>
#include <condition_variable>
-#include <iostream>
#include <memory>
#include <string>
@@ -51,62 +51,6 @@
bool getDataExists() const;
};
-BlobPutClientReactor::BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier)
- : holder(holder),
- hash(hash),
- dataChunks(folly::MPMCQueue<std::string>(100)),
- terminationNotifier(terminationNotifier) {
-}
-
-void BlobPutClientReactor::scheduleSendingDataChunk(
- std::unique_ptr<std::string> dataChunk) {
- if (!this->dataChunks.write(std::move(*dataChunk))) {
- throw std::runtime_error(
- "Error scheduling sending a data chunk to send to the blob service");
- }
-}
-
-std::unique_ptr<grpc::Status> BlobPutClientReactor::prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) {
- if (this->state == State::SEND_HOLDER) {
- this->request.set_holder(this->holder);
- this->state = State::SEND_HASH;
- return nullptr;
- }
- if (this->state == State::SEND_HASH) {
- request.set_blobhash(this->hash);
- this->state = State::SEND_CHUNKS;
- return nullptr;
- }
- if (previousResponse->dataexists()) {
- this->dataExists = true;
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- std::string dataChunk;
- this->dataChunks.blockingRead(dataChunk);
- if (dataChunk.empty()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- request.set_datachunk(dataChunk);
- return nullptr;
-}
-
-void BlobPutClientReactor::doneCallback() {
- this->terminationNotifier->notify_one();
-}
-
-grpc::Status BlobPutClientReactor::getStatus() const {
- return this->status;
-}
-
-bool BlobPutClientReactor::getDataExists() const {
- return this->dataExists;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
copy from services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
copy to services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
@@ -1,56 +1,11 @@
-#pragma once
+#include "BlobPutClientReactor.h"
-#include "ClientBidiReactorBase.h"
-#include "Constants.h"
-
-#include "../_generated/blob.grpc.pb.h"
-#include "../_generated/blob.pb.h"
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <condition_variable>
#include <iostream>
-#include <memory>
-#include <string>
namespace comm {
namespace network {
namespace reactor {
-class BlobPutClientReactor
- : public ClientBidiReactorBase<blob::PutRequest, blob::PutResponse> {
-
- enum class State {
- SEND_HOLDER = 0,
- SEND_HASH = 1,
- SEND_CHUNKS = 2,
- };
-
- State state = State::SEND_HOLDER;
- const std::string hash;
- const std::string holder;
- size_t currentDataSize = 0;
- const size_t chunkSize =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
- folly::MPMCQueue<std::string> dataChunks;
- std::condition_variable *terminationNotifier;
- bool dataExists = false;
-
-public:
- BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier);
- void scheduleSendingDataChunk(std::unique_ptr<std::string> dataChunk);
- std::unique_ptr<grpc::Status> prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) override;
- void doneCallback() override;
- grpc::Status getStatus() const;
- bool getDataExists() const;
-};
-
BlobPutClientReactor::BlobPutClientReactor(
const std::string &holder,
const std::string &hash,
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
@@ -1,13 +1,12 @@
#pragma once
-#include "DatabaseManager.h"
-#include "ServerBidiReactorBase.h"
#include "ServiceBlobClient.h"
-#include "Tools.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
+#include "ServerBidiReactorBase.h"
+
#include <condition_variable>
#include <memory>
#include <mutex>
@@ -55,119 +54,6 @@
void terminateCallback() override;
};
-std::string CreateNewBackupReactor::generateBackupID() {
- // mock
- return generateRandomString();
-}
-
-void CreateNewBackupReactor::initializeHolderReactor() {
- if (this->holder.empty()) {
- throw std::runtime_error(
- "holder reactor cannot be initialized with empty holder");
- }
- if (this->dataHash.empty()) {
- throw std::runtime_error(
- "holder reactor cannot be initialized with empty hash");
- }
- if (this->holderReactor == nullptr) {
- this->holderReactor =
- std::make_shared<reactor::BlobAppendHolderClientReactor>(
- this->holder, this->dataHash, &this->blobAppendHolderDoneCV);
- this->blobClient.appendHolder(this->holderReactor);
- }
-}
-
-std::unique_ptr<ServerBidiReactorStatus> CreateNewBackupReactor::handleRequest(
- backup::CreateNewBackupRequest request,
- backup::CreateNewBackupResponse *response) {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- switch (this->state) {
- case State::USER_ID: {
- if (!request.has_userid()) {
- throw std::runtime_error("user id expected but not received");
- }
- this->userID = request.userid();
- this->state = State::KEY_ENTROPY;
- return nullptr;
- }
- case State::KEY_ENTROPY: {
- if (!request.has_keyentropy()) {
- throw std::runtime_error(
- "backup key entropy expected but not received");
- }
- this->keyEntropy = request.keyentropy();
- this->state = State::DATA_HASH;
- return nullptr;
- }
- case State::DATA_HASH: {
- if (!request.has_newcompactionhash()) {
- throw std::runtime_error("data hash expected but not received");
- }
- this->dataHash = request.newcompactionhash();
- this->state = State::DATA_CHUNKS;
-
- // TODO confirm - holder may be a backup id
- this->backupID = this->generateBackupID();
- response->set_backupid(this->backupID);
- this->holder = this->backupID;
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->holder, this->dataHash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
- return nullptr;
- }
- case State::DATA_CHUNKS: {
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(
- std::move(*request.mutable_newcompactionchunk())));
- return nullptr;
- }
- }
- throw std::runtime_error("new backup - invalid state");
-}
-
-void CreateNewBackupReactor::terminateCallback() {
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->putReactor == nullptr) {
- return;
- }
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lock2(this->blobPutDoneCVMutex);
- if (this->putReactor->isDone()) {
- if (!this->putReactor->getStatus().ok()) {
- throw std::runtime_error(this->putReactor->getStatus().error_message());
- }
- } else {
- this->blobPutDoneCV.wait(lock2);
- }
- if (this->putReactor->getDataExists()) {
- this->initializeHolderReactor();
- std::unique_lock<std::mutex> lockHolder(this->blobAppendHolderDoneCVMutex);
- if (this->holderReactor->isDone()) {
- if (!this->holderReactor->getStatus().ok()) {
- throw std::runtime_error(
- this->holderReactor->getStatus().error_message());
- }
- } else {
- this->blobAppendHolderDoneCV.wait(lockHolder);
- }
- }
- try {
- // TODO add recovery data
- // TODO handle attachments holders
- database::BackupItem backupItem(
- this->userID,
- this->backupID,
- getCurrentTimestamp(),
- generateRandomString(),
- this->holder,
- {});
- database::DatabaseManager::getInstance().putBackupItem(backupItem);
- } catch (std::runtime_error &e) {
- std::cout << "db operations error: " << e.what() << std::endl;
- }
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
copy from services/backup/src/Reactors/server/CreateNewBackupReactor.h
copy to services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -1,60 +1,12 @@
-#pragma once
+#include "CreateNewBackupReactor.h"
#include "DatabaseManager.h"
-#include "ServerBidiReactorBase.h"
-#include "ServiceBlobClient.h"
#include "Tools.h"
-#include "../_generated/backup.grpc.pb.h"
-#include "../_generated/backup.pb.h"
-
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <string>
-
namespace comm {
namespace network {
namespace reactor {
-class CreateNewBackupReactor : public ServerBidiReactorBase<
- backup::CreateNewBackupRequest,
- backup::CreateNewBackupResponse> {
- enum class State {
- USER_ID = 1,
- KEY_ENTROPY = 2,
- DATA_HASH = 3,
- DATA_CHUNKS = 4,
- };
-
- State state = State::USER_ID;
- std::string userID;
- std::string keyEntropy;
- std::string dataHash;
- std::string holder;
- std::string backupID;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- std::shared_ptr<reactor::BlobAppendHolderClientReactor> holderReactor;
-
- ServiceBlobClient blobClient;
- std::mutex reactorStateMutex;
-
- std::condition_variable blobPutDoneCV;
- std::mutex blobPutDoneCVMutex;
-
- std::condition_variable blobAppendHolderDoneCV;
- std::mutex blobAppendHolderDoneCVMutex;
-
- std::string generateBackupID();
- void initializeHolderReactor();
-
-public:
- std::unique_ptr<ServerBidiReactorStatus> handleRequest(
- backup::CreateNewBackupRequest request,
- backup::CreateNewBackupResponse *response) override;
- void terminateCallback() override;
-};
-
std::string CreateNewBackupReactor::generateBackupID() {
// mock
return generateRandomString();
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -2,16 +2,15 @@
#include "BlobGetClientReactor.h"
#include "DatabaseEntitiesTools.h"
-#include "DatabaseManager.h"
-#include "ServerWriteReactorBase.h"
#include "ServiceBlobClient.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
+#include "ServerWriteReactorBase.h"
+
#include <folly/MPMCQueue.h>
-#include <iostream>
#include <memory>
#include <string>
#include <vector>
@@ -51,118 +50,6 @@
void terminateCallback() override;
};
-PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request)
- : ServerWriteReactorBase<
- backup::PullBackupRequest,
- backup::PullBackupResponse>(request),
- dataChunks(std::make_shared<folly::MPMCQueue<std::string>>(100)) {
-}
-
-void PullBackupReactor::initializeGetReactor(const std::string &holder) {
- if (this->backupItem == nullptr) {
- throw std::runtime_error(
- "get reactor cannot be initialized when backup item is missing");
- }
- this->getReactor.reset(
- new reactor::BlobGetClientReactor(holder, this->dataChunks));
- this->getReactor->request.set_holder(holder);
- this->blobClient.get(this->getReactor);
-}
-
-void PullBackupReactor::initialize() {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->request.userid().empty()) {
- throw std::runtime_error("no user id provided");
- }
- if (this->request.backupid().empty()) {
- throw std::runtime_error("no backup id provided");
- }
- this->backupItem = database::DatabaseManager::getInstance().findBackupItem(
- this->request.userid(), this->request.backupid());
- if (this->backupItem == nullptr) {
- throw std::runtime_error(
- "no backup found for provided parameters: user id [" +
- this->request.userid() + "], backup id [" + this->request.backupid() +
- "]");
- }
- this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup(
- this->request.backupid());
-}
-
-std::unique_ptr<grpc::Status>
-PullBackupReactor::writeResponse(backup::PullBackupResponse *response) {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->state == State::COMPACTION) {
- if (this->getReactor == nullptr) {
- this->initializeGetReactor(this->backupItem->getCompactionHolder());
- }
- std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!dataChunk.empty()) {
- response->set_compactionchunk(dataChunk);
- return nullptr;
- } else {
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error(
- "dangling data discovered after reading compaction");
- }
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
- }
- this->state = State::LOGS;
- }
- }
- if (this->state == State::LOGS) {
- // TODO make sure logs are received in correct order regardless their size
- if (this->logs.empty()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- if (this->currentLogIndex == this->logs.size()) {
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error(
- "dangling data discovered after reading logs");
- }
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- } else if (this->currentLogIndex > this->logs.size()) {
- throw std::runtime_error("log index out of bound");
- }
- if (this->currentLog == nullptr) {
- this->currentLog = this->logs.at(this->currentLogIndex);
- if (this->currentLog->getPersistedInBlob()) {
- this->initializeGetReactor(this->currentLog->getValue());
- } else {
- response->set_logchunk(this->currentLog->getValue());
- ++this->currentLogIndex;
- this->currentLog = nullptr;
- return nullptr;
- }
- }
- std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
- }
- if (dataChunk.empty()) {
- ++this->currentLogIndex;
- this->currentLog = nullptr;
- } else {
- response->set_logchunk(dataChunk);
- }
- return nullptr;
- }
- throw std::runtime_error("unhandled state");
-}
-
-void PullBackupReactor::terminateCallback() {
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
- }
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.cpp
copy from services/backup/src/Reactors/server/PullBackupReactor.h
copy to services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -1,56 +1,13 @@
-#pragma once
+#include "PullBackupReactor.h"
-#include "BlobGetClientReactor.h"
-#include "DatabaseEntitiesTools.h"
#include "DatabaseManager.h"
-#include "ServerWriteReactorBase.h"
-#include "ServiceBlobClient.h"
-
-#include "../_generated/backup.grpc.pb.h"
-#include "../_generated/backup.pb.h"
-
-#include <folly/MPMCQueue.h>
#include <iostream>
-#include <memory>
-#include <string>
-#include <vector>
namespace comm {
namespace network {
namespace reactor {
-class PullBackupReactor : public ServerWriteReactorBase<
- backup::PullBackupRequest,
- backup::PullBackupResponse> {
-
- enum class State {
- COMPACTION = 1,
- LOGS = 2,
- };
-
- std::shared_ptr<database::BackupItem> backupItem;
- std::shared_ptr<reactor::BlobGetClientReactor> getReactor;
- std::mutex reactorStateMutex;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
- ServiceBlobClient blobClient;
- State state = State::COMPACTION;
- std::vector<std::shared_ptr<database::LogItem>> logs;
- size_t currentLogIndex = 0;
- std::shared_ptr<database::LogItem> currentLog;
-
- void initializeGetReactor(const std::string &holder);
-
-public:
- PullBackupReactor(const backup::PullBackupRequest *request);
-
- void initialize() override;
-
- std::unique_ptr<grpc::Status>
- writeResponse(backup::PullBackupResponse *response) override;
- void terminateCallback() override;
-};
-
PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request)
: ServerWriteReactorBase<
backup::PullBackupRequest,
@@ -116,44 +73,43 @@
this->state = State::LOGS;
}
}
- if (this->state == State::LOGS) {
- // TODO make sure logs are received in correct order regardless their size
- if (this->logs.empty()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- if (this->currentLogIndex == this->logs.size()) {
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error(
- "dangling data discovered after reading logs");
- }
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- } else if (this->currentLogIndex > this->logs.size()) {
- throw std::runtime_error("log index out of bound");
- }
- if (this->currentLog == nullptr) {
- this->currentLog = this->logs.at(this->currentLogIndex);
- if (this->currentLog->getPersistedInBlob()) {
- this->initializeGetReactor(this->currentLog->getValue());
- } else {
- response->set_logchunk(this->currentLog->getValue());
- ++this->currentLogIndex;
- this->currentLog = nullptr;
- return nullptr;
- }
- }
- std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
+ if (this->state == State::LOGS) {
+ // TODO make sure logs are received in correct order regardless their size
+ if (this->logs.empty()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ }
+ if (this->currentLogIndex == this->logs.size()) {
+ if (!this->dataChunks->isEmpty()) {
+ throw std::runtime_error("dangling data discovered after reading logs");
}
- if (dataChunk.empty()) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
+ } else if (this->currentLogIndex > this->logs.size()) {
+ throw std::runtime_error("log index out of bound");
+ }
+ if (this->currentLog == nullptr) {
+ this->currentLog = this->logs.at(this->currentLogIndex);
+ if (this->currentLog->getPersistedInBlob()) {
+ this->initializeGetReactor(this->currentLog->getValue());
+ } else {
+ response->set_logchunk(this->currentLog->getValue());
++this->currentLogIndex;
this->currentLog = nullptr;
- } else {
- response->set_logchunk(dataChunk);
+ return nullptr;
}
- return nullptr;
}
+ std::string dataChunk;
+ this->dataChunks->blockingRead(dataChunk);
+ if (!this->getReactor->getStatus().ok()) {
+ throw std::runtime_error(this->getReactor->getStatus().error_message());
+ }
+ if (dataChunk.empty()) {
+ ++this->currentLogIndex;
+ this->currentLog = nullptr;
+ } else {
+ response->set_logchunk(dataChunk);
+ }
+ return nullptr;
+ }
throw std::runtime_error("unhandled state");
}
diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.h
@@ -1,14 +1,11 @@
#pragma once
-#include "Constants.h"
#include "ServerReadReactorBase.h"
#include "ServiceBlobClient.h"
-#include "Tools.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
-#include <iostream>
#include <memory>
#include <string>
@@ -59,9 +56,6 @@
void initializePutReactor();
void initializeHolderReactor();
- void storeInBlob(const std::string &data) {
- }
-
public:
using ServerReadReactorBase<backup::SendLogRequest, google::protobuf::Empty>::
ServerReadReactorBase;
@@ -72,178 +66,6 @@
void terminateCallback() override;
};
-void SendLogReactor::storeInDatabase() {
- // TODO handle attachment holders
- database::LogItem logItem(
- this->backupID,
- this->generateLogID(),
- (this->persistenceMethod == PersistenceMethod::BLOB),
- this->value,
- {});
- database::DatabaseManager::getInstance().putLogItem(logItem);
-}
-
-std::string SendLogReactor::generateHolder() {
- // TODO replace mock
- return generateRandomString();
-}
-
-std::string SendLogReactor::generateLogID() {
- // TODO replace mock
- return generateRandomString();
-}
-
-void SendLogReactor::initializePutReactor() {
- if (this->value.empty()) {
- throw std::runtime_error(
- "put reactor cannot be initialized with empty value");
- }
- if (this->hash.empty()) {
- throw std::runtime_error(
- "put reactor cannot be initialized with empty hash");
- }
- if (this->putReactor == nullptr) {
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->value, this->hash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
- }
-}
-
-void SendLogReactor::initializeHolderReactor() {
- if (this->value.empty()) {
- throw std::runtime_error(
- "holder reactor cannot be initialized with empty value");
- }
- if (this->hash.empty()) {
- throw std::runtime_error(
- "holder reactor cannot be initialized with empty hash");
- }
- if (this->holderReactor == nullptr) {
- this->holderReactor =
- std::make_shared<reactor::BlobAppendHolderClientReactor>(
- this->value, this->hash, &this->blobAppendHolderDoneCV);
- this->blobClient.appendHolder(this->holderReactor);
- }
-}
-
-std::unique_ptr<grpc::Status>
-SendLogReactor::readRequest(backup::SendLogRequest request) {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- switch (this->state) {
- case State::USER_ID: {
- if (!request.has_userid()) {
- throw std::runtime_error("user id expected but not received");
- }
- this->userID = request.userid();
- this->state = State::BACKUP_ID;
- return nullptr;
- };
- case State::BACKUP_ID: {
- if (!request.has_backupid()) {
- throw std::runtime_error("backup id expected but not received");
- }
- this->backupID = request.backupid();
- this->state = State::LOG_HASH;
- return nullptr;
- };
- case State::LOG_HASH: {
- if (!request.has_loghash()) {
- throw std::runtime_error("log hash expected but not received");
- }
- this->hash = request.loghash();
- this->state = State::LOG_CHUNK;
- return nullptr;
- };
- case State::LOG_CHUNK: {
- if (!request.has_logdata()) {
- throw std::runtime_error("log data expected but not received");
- }
- std::unique_ptr<std::string> chunk =
- std::make_unique<std::string>(std::move(*request.mutable_logdata()));
- if (chunk->size() == 0) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- // decide if keep in DB or upload to blob
- if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) {
- if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
- this->persistenceMethod = PersistenceMethod::DB;
- this->value = std::move(*chunk);
- this->storeInDatabase();
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- } else if (this->persistenceMethod == PersistenceMethod::BLOB) {
- this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
- } else {
- throw std::runtime_error(
- "error - invalid persistence state for chunk smaller than "
- "database limit");
- }
- } else {
- if (this->persistenceMethod != PersistenceMethod::UNKNOWN &&
- this->persistenceMethod != PersistenceMethod::BLOB) {
- throw std::runtime_error(
- "error - invalid persistence state, uploading to blob should be "
- "continued but it is not");
- }
- if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
- this->persistenceMethod = PersistenceMethod::BLOB;
- }
- if (this->value.empty()) {
- this->value = this->generateHolder();
- }
- this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
- }
-
- return nullptr;
- };
- }
- throw std::runtime_error("send log - invalid state");
-}
-
-void SendLogReactor::terminateCallback() {
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
-
- if (this->persistenceMethod == PersistenceMethod::DB ||
- this->putReactor == nullptr) {
- return;
- }
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
- if (this->putReactor->isDone()) {
- if (!this->putReactor->getStatus().ok()) {
- throw std::runtime_error(this->putReactor->getStatus().error_message());
- }
- } else {
- this->blobPutDoneCV.wait(lockPut);
- }
- if (this->putReactor->getDataExists()) {
- this->initializeHolderReactor();
- std::unique_lock<std::mutex> lockHolder(this->blobAppendHolderDoneCVMutex);
- if (this->holderReactor->isDone()) {
- if (!this->holderReactor->getStatus().ok()) {
- throw std::runtime_error(
- this->holderReactor->getStatus().error_message());
- }
- } else {
- this->blobAppendHolderDoneCV.wait(lockHolder);
- }
- }
- // store in db only when we successfully upload chunks
- this->storeInDatabase();
-}
-
-void SendLogReactor::doneCallback() {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- // TODO implement
- std::cout << "receive logs done " << this->status.error_code() << "/"
- << this->status.error_message() << std::endl;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.cpp
copy from services/backup/src/Reactors/server/SendLogReactor.h
copy to services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -1,77 +1,15 @@
-#pragma once
+#include "SendLogReactor.h"
+#include "DatabaseManager.h"
#include "Constants.h"
-#include "ServerReadReactorBase.h"
-#include "ServiceBlobClient.h"
#include "Tools.h"
-#include "../_generated/backup.grpc.pb.h"
-#include "../_generated/backup.pb.h"
-
#include <iostream>
-#include <memory>
-#include <string>
namespace comm {
namespace network {
namespace reactor {
-class SendLogReactor : public ServerReadReactorBase<
- backup::SendLogRequest,
- google::protobuf::Empty> {
- enum class State {
- USER_ID = 1,
- BACKUP_ID = 2,
- LOG_HASH = 3,
- LOG_CHUNK = 4,
- };
-
- enum class PersistenceMethod {
- UNKNOWN = 0,
- DB = 1,
- BLOB = 2,
- };
-
- State state = State::USER_ID;
- PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN;
- std::string userID;
- std::string backupID;
- std::string hash;
- // either the value itself which is a dump of a single operation (if
- // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is
- // true)
- std::string value;
- std::mutex reactorStateMutex;
-
- std::condition_variable blobPutDoneCV;
- std::mutex blobPutDoneCVMutex;
-
- std::condition_variable blobAppendHolderDoneCV;
- std::mutex blobAppendHolderDoneCVMutex;
-
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- std::shared_ptr<reactor::BlobAppendHolderClientReactor> holderReactor;
- ServiceBlobClient blobClient;
-
- void storeInDatabase();
- std::string generateHolder();
- std::string generateLogID();
- void initializePutReactor();
- void initializeHolderReactor();
-
- void storeInBlob(const std::string &data) {
- }
-
-public:
- using ServerReadReactorBase<backup::SendLogRequest, google::protobuf::Empty>::
- ServerReadReactorBase;
-
- std::unique_ptr<grpc::Status>
- readRequest(backup::SendLogRequest request) override;
- void doneCallback() override;
- void terminateCallback() override;
-};
-
void SendLogReactor::storeInDatabase() {
// TODO handle attachment holders
database::LogItem logItem(

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 6:28 AM (19 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2593061
Default Alt Text
D3780.id11608.diff (35 KB)

Event Timeline