Page MenuHomePhabricator

D3780.diff
No OneTemporary

D3780.diff

diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt
--- a/services/backup/CMakeLists.txt
+++ b/services/backup/CMakeLists.txt
@@ -36,7 +36,7 @@
set(DEV_HEADERS_PATH "./dev")
endif()
-file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp")
+file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp")
list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$")
foreach (ITEM ${DEV_SOURCE_CODE})
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
@@ -1,10 +1,10 @@
#pragma once
-#include "ClientReadReactorBase.h"
-
#include "../_generated/blob.grpc.pb.h"
#include "../_generated/blob.pb.h"
+#include "ClientReadReactorBase.h"
+
#include <folly/MPMCQueue.h>
#include <grpcpp/grpcpp.h>
@@ -31,28 +31,6 @@
grpc::Status getStatus() const;
};
-BlobGetClientReactor::BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks)
- : holder(holder), dataChunks(dataChunks) {
-}
-
-std::unique_ptr<grpc::Status>
-BlobGetClientReactor::readResponse(blob::GetResponse &response) {
- if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) {
- throw std::runtime_error("error reading data from the blob service");
- }
- return nullptr;
-}
-
-void BlobGetClientReactor::doneCallback() {
- this->dataChunks->write("");
-}
-
-grpc::Status BlobGetClientReactor::getStatus() const {
- return this->status;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
copy from services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
copy to services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp
@@ -1,36 +1,10 @@
-#pragma once
-#include "ClientReadReactorBase.h"
-
-#include "../_generated/blob.grpc.pb.h"
-#include "../_generated/blob.pb.h"
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-#include <string>
+#include "BlobGetClientReactor.h"
namespace comm {
namespace network {
namespace reactor {
-class BlobGetClientReactor
- : public ClientReadReactorBase<blob::GetRequest, blob::GetResponse> {
- std::string holder;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
-
-public:
- BlobGetClientReactor(
- const std::string &holder,
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks);
-
- std::unique_ptr<grpc::Status>
- readResponse(blob::GetResponse &response) override;
- void doneCallback() override;
- grpc::Status getStatus() const;
-};
-
BlobGetClientReactor::BlobGetClientReactor(
const std::string &holder,
std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks)
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
@@ -1,16 +1,16 @@
#pragma once
-#include "ClientBidiReactorBase.h"
#include "Constants.h"
#include "../_generated/blob.grpc.pb.h"
#include "../_generated/blob.pb.h"
+#include "ClientBidiReactorBase.h"
+
#include <folly/MPMCQueue.h>
#include <grpcpp/grpcpp.h>
#include <condition_variable>
-#include <iostream>
#include <memory>
#include <string>
@@ -49,57 +49,6 @@
grpc::Status getStatus() const;
};
-BlobPutClientReactor::BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier)
- : holder(holder),
- hash(hash),
- dataChunks(folly::MPMCQueue<std::string>(100)),
- terminationNotifier(terminationNotifier) {
-}
-
-void BlobPutClientReactor::scheduleSendingDataChunk(
- std::unique_ptr<std::string> dataChunk) {
- if (!this->dataChunks.write(std::move(*dataChunk))) {
- throw std::runtime_error(
- "Error scheduling sending a data chunk to send to the blob service");
- }
-}
-
-std::unique_ptr<grpc::Status> BlobPutClientReactor::prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) {
- if (this->state == State::SEND_HOLDER) {
- this->request.set_holder(this->holder);
- this->state = State::SEND_HASH;
- return nullptr;
- }
- if (this->state == State::SEND_HASH) {
- request.set_blobhash(this->hash);
- this->state = State::SEND_CHUNKS;
- return nullptr;
- }
- if (previousResponse->dataexists()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- std::string dataChunk;
- this->dataChunks.blockingRead(dataChunk);
- if (dataChunk.empty()) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- request.set_datachunk(dataChunk);
- return nullptr;
-}
-
-void BlobPutClientReactor::doneCallback() {
- this->terminationNotifier->notify_one();
-}
-
-grpc::Status BlobPutClientReactor::getStatus() const {
- return this->status;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
copy from services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
copy to services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
--- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h
+++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp
@@ -1,54 +1,11 @@
-#pragma once
+#include "BlobPutClientReactor.h"
-#include "ClientBidiReactorBase.h"
-#include "Constants.h"
-
-#include "../_generated/blob.grpc.pb.h"
-#include "../_generated/blob.pb.h"
-
-#include <folly/MPMCQueue.h>
-#include <grpcpp/grpcpp.h>
-
-#include <condition_variable>
#include <iostream>
-#include <memory>
-#include <string>
namespace comm {
namespace network {
namespace reactor {
-class BlobPutClientReactor
- : public ClientBidiReactorBase<blob::PutRequest, blob::PutResponse> {
-
- enum class State {
- SEND_HOLDER = 0,
- SEND_HASH = 1,
- SEND_CHUNKS = 2,
- };
-
- State state = State::SEND_HOLDER;
- const std::string hash;
- const std::string holder;
- size_t currentDataSize = 0;
- const size_t chunkSize =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
- folly::MPMCQueue<std::string> dataChunks;
- std::condition_variable *terminationNotifier;
-
-public:
- BlobPutClientReactor(
- const std::string &holder,
- const std::string &hash,
- std::condition_variable *terminationNotifier);
- void scheduleSendingDataChunk(std::unique_ptr<std::string> dataChunk);
- std::unique_ptr<grpc::Status> prepareRequest(
- blob::PutRequest &request,
- std::shared_ptr<blob::PutResponse> previousResponse) override;
- void doneCallback() override;
- grpc::Status getStatus() const;
-};
-
BlobPutClientReactor::BlobPutClientReactor(
const std::string &holder,
const std::string &hash,
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h
@@ -1,13 +1,12 @@
#pragma once
-#include "DatabaseManager.h"
-#include "ServerBidiReactorBase.h"
#include "ServiceBlobClient.h"
-#include "Tools.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
+#include "ServerBidiReactorBase.h"
+
#include <condition_variable>
#include <memory>
#include <mutex>
@@ -50,88 +49,6 @@
void terminateCallback() override;
};
-std::string CreateNewBackupReactor::generateBackupID() {
- // mock
- return generateRandomString();
-}
-
-std::unique_ptr<ServerBidiReactorStatus> CreateNewBackupReactor::handleRequest(
- backup::CreateNewBackupRequest request,
- backup::CreateNewBackupResponse *response) {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- switch (this->state) {
- case State::USER_ID: {
- if (!request.has_userid()) {
- throw std::runtime_error("user id expected but not received");
- }
- this->userID = request.userid();
- this->state = State::KEY_ENTROPY;
- return nullptr;
- }
- case State::KEY_ENTROPY: {
- if (!request.has_keyentropy()) {
- throw std::runtime_error(
- "backup key entropy expected but not received");
- }
- this->keyEntropy = request.keyentropy();
- this->state = State::DATA_HASH;
- return nullptr;
- }
- case State::DATA_HASH: {
- if (!request.has_newcompactionhash()) {
- throw std::runtime_error("data hash expected but not received");
- }
- this->dataHash = request.newcompactionhash();
- this->state = State::DATA_CHUNKS;
-
- // TODO confirm - holder may be a backup id
- this->backupID = this->generateBackupID();
- response->set_backupid(this->backupID);
- this->holder = this->backupID;
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->holder, this->dataHash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
- return nullptr;
- }
- case State::DATA_CHUNKS: {
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(
- std::move(*request.mutable_newcompactionchunk())));
- return nullptr;
- }
- }
- throw std::runtime_error("new backup - invalid state");
-}
-
-void CreateNewBackupReactor::terminateCallback() {
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->putReactor == nullptr) {
- return;
- }
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lock2(this->blobPutDoneCVMutex);
- if (!this->putReactor->isDone()) {
- this->blobPutDoneCV.wait(lock2);
- } else if (!this->putReactor->getStatus().ok()) {
- throw std::runtime_error(this->putReactor->getStatus().error_message());
- }
- try {
- // TODO add recovery data
- // TODO handle attachments holders
- database::BackupItem backupItem(
- this->userID,
- this->backupID,
- getCurrentTimestamp(),
- generateRandomString(),
- this->holder,
- {});
- database::DatabaseManager::getInstance().putBackupItem(backupItem);
- } catch (std::runtime_error &e) {
- std::cout << "db operations error: " << e.what() << std::endl;
- }
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
copy from services/backup/src/Reactors/server/CreateNewBackupReactor.h
copy to services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -1,55 +1,12 @@
-#pragma once
+#include "CreateNewBackupReactor.h"
#include "DatabaseManager.h"
-#include "ServerBidiReactorBase.h"
-#include "ServiceBlobClient.h"
#include "Tools.h"
-#include "../_generated/backup.grpc.pb.h"
-#include "../_generated/backup.pb.h"
-
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <string>
-
namespace comm {
namespace network {
namespace reactor {
-class CreateNewBackupReactor : public ServerBidiReactorBase<
- backup::CreateNewBackupRequest,
- backup::CreateNewBackupResponse> {
- enum class State {
- USER_ID = 1,
- KEY_ENTROPY = 2,
- DATA_HASH = 3,
- DATA_CHUNKS = 4,
- };
-
- State state = State::USER_ID;
- std::string userID;
- std::string keyEntropy;
- std::string dataHash;
- std::string holder;
- std::string backupID;
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
-
- ServiceBlobClient blobClient;
- std::mutex reactorStateMutex;
-
- std::condition_variable blobPutDoneCV;
- std::mutex blobPutDoneCVMutex;
-
- std::string generateBackupID();
-
-public:
- std::unique_ptr<ServerBidiReactorStatus> handleRequest(
- backup::CreateNewBackupRequest request,
- backup::CreateNewBackupResponse *response) override;
- void terminateCallback() override;
-};
-
std::string CreateNewBackupReactor::generateBackupID() {
// mock
return generateRandomString();
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.h
@@ -2,16 +2,15 @@
#include "BlobGetClientReactor.h"
#include "DatabaseEntitiesTools.h"
-#include "DatabaseManager.h"
-#include "ServerWriteReactorBase.h"
#include "ServiceBlobClient.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
+#include "ServerWriteReactorBase.h"
+
#include <folly/MPMCQueue.h>
-#include <iostream>
#include <memory>
#include <string>
#include <vector>
@@ -51,138 +50,6 @@
void terminateCallback() override;
};
-PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request)
- : ServerWriteReactorBase<
- backup::PullBackupRequest,
- backup::PullBackupResponse>(request),
- dataChunks(std::make_shared<folly::MPMCQueue<std::string>>(100)) {
-}
-
-void PullBackupReactor::initializeGetReactor(const std::string &holder) {
- if (this->backupItem == nullptr) {
- throw std::runtime_error(
- "get reactor cannot be initialized when backup item is missing");
- }
- this->getReactor.reset(
- new reactor::BlobGetClientReactor(holder, this->dataChunks));
- this->getReactor->request.set_holder(holder);
- this->blobClient.get(this->getReactor);
-}
-
-void PullBackupReactor::initialize() {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->request.userid().empty()) {
- throw std::runtime_error("no user id provided");
- }
- if (this->request.backupid().empty()) {
- throw std::runtime_error("no backup id provided");
- }
- this->backupItem = database::DatabaseManager::getInstance().findBackupItem(
- this->request.userid(), this->request.backupid());
- if (this->backupItem == nullptr) {
- throw std::runtime_error(
- "no backup found for provided parameters: user id [" +
- this->request.userid() + "], backup id [" + this->request.backupid() +
- "]");
- }
- this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup(
- this->request.backupid());
-}
-
-std::unique_ptr<grpc::Status>
-PullBackupReactor::writeResponse(backup::PullBackupResponse *response) {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->state == State::COMPACTION) {
- if (this->getReactor == nullptr) {
- this->initializeGetReactor(this->backupItem->getCompactionHolder());
- }
- std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!dataChunk.empty()) {
- response->set_compactionchunk(dataChunk);
- return nullptr;
- }
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error(
- "dangling data discovered after reading compaction");
- }
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
- }
- this->state = State::LOGS;
- }
- if (this->state == State::LOGS) {
- // TODO make sure logs are received in correct order regardless their size
- if (this->logs.empty()) {
- // this means that there are no logs at all so we just terminate with the
- // compaction
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- if (this->currentLogIndex == this->logs.size()) {
- // we reached the end of the logs collection so we just want to terminate
- // either we terminate with an error if we have some dangling data
- // or with success if we don't
- if (!this->dataChunks->isEmpty()) {
- throw std::runtime_error("dangling data discovered after reading logs");
- }
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- if (this->currentLogIndex > this->logs.size()) {
- // we went out of the scope of the logs collection, this should never
- // happen and should be perceived as an error
- throw std::runtime_error("log index out of bound");
- }
- // this means that we're not reading anything between invocations of
- // writeResponse
- // it is only not null when we read data in chunks
- if (this->currentLog == nullptr) {
- this->currentLog = this->logs.at(this->currentLogIndex);
- if (this->currentLog->getPersistedInBlob()) {
- // if the item is stored in the blob, we initialize the get reactor and
- // proceed
- this->initializeGetReactor(this->currentLog->getValue());
- } else {
- // if the item is persisted in the database, we just take it, send the
- // data to the client and reset currentLog so the next invocation of
- // writeResponse will take another one from the collection
- response->set_logchunk(this->currentLog->getValue());
- ++this->currentLogIndex;
- this->currentLog = nullptr;
- return nullptr;
- }
- }
- // we want to read the chunks from the blob through the get client until we
- // get an empty chunk - a sign of "end of chunks"
- std::string dataChunk;
- this->dataChunks->blockingRead(dataChunk);
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
- }
- // if we get an empty chunk, we reset the currentLog so we can read the next
- // one from the logs collection.
- // If there's data inside, we write it to the client and proceed.
- if (dataChunk.empty()) {
- ++this->currentLogIndex;
- this->currentLog = nullptr;
- return nullptr;
- } else {
- response->set_logchunk(dataChunk);
- }
- return nullptr;
- }
- throw std::runtime_error("unhandled state");
-}
-
-void PullBackupReactor::terminateCallback() {
- if (!this->getReactor->getStatus().ok()) {
- throw std::runtime_error(this->getReactor->getStatus().error_message());
- }
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.cpp
copy from services/backup/src/Reactors/server/PullBackupReactor.h
copy to services/backup/src/Reactors/server/PullBackupReactor.cpp
--- a/services/backup/src/Reactors/server/PullBackupReactor.h
+++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp
@@ -1,56 +1,13 @@
-#pragma once
+#include "PullBackupReactor.h"
-#include "BlobGetClientReactor.h"
-#include "DatabaseEntitiesTools.h"
#include "DatabaseManager.h"
-#include "ServerWriteReactorBase.h"
-#include "ServiceBlobClient.h"
-
-#include "../_generated/backup.grpc.pb.h"
-#include "../_generated/backup.pb.h"
-
-#include <folly/MPMCQueue.h>
#include <iostream>
-#include <memory>
-#include <string>
-#include <vector>
namespace comm {
namespace network {
namespace reactor {
-class PullBackupReactor : public ServerWriteReactorBase<
- backup::PullBackupRequest,
- backup::PullBackupResponse> {
-
- enum class State {
- COMPACTION = 1,
- LOGS = 2,
- };
-
- std::shared_ptr<database::BackupItem> backupItem;
- std::shared_ptr<reactor::BlobGetClientReactor> getReactor;
- std::mutex reactorStateMutex;
- std::shared_ptr<folly::MPMCQueue<std::string>> dataChunks;
- ServiceBlobClient blobClient;
- State state = State::COMPACTION;
- std::vector<std::shared_ptr<database::LogItem>> logs;
- size_t currentLogIndex = 0;
- std::shared_ptr<database::LogItem> currentLog;
-
- void initializeGetReactor(const std::string &holder);
-
-public:
- PullBackupReactor(const backup::PullBackupRequest *request);
-
- void initialize() override;
-
- std::unique_ptr<grpc::Status>
- writeResponse(backup::PullBackupResponse *response) override;
- void terminateCallback() override;
-};
-
PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request)
: ServerWriteReactorBase<
backup::PullBackupRequest,
diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.h
@@ -1,14 +1,11 @@
#pragma once
-#include "Constants.h"
#include "ServerReadReactorBase.h"
#include "ServiceBlobClient.h"
-#include "Tools.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
-#include <iostream>
#include <memory>
#include <string>
@@ -54,9 +51,6 @@
std::string generateLogID();
void initializePutReactor();
- void storeInBlob(const std::string &data) {
- }
-
public:
using ServerReadReactorBase<backup::SendLogRequest, google::protobuf::Empty>::
ServerReadReactorBase;
@@ -67,147 +61,6 @@
void terminateCallback() override;
};
-void SendLogReactor::storeInDatabase() {
- // TODO handle attachment holders
- database::LogItem logItem(
- this->backupID,
- this->generateLogID(),
- (this->persistenceMethod == PersistenceMethod::BLOB),
- this->value,
- {});
- database::DatabaseManager::getInstance().putLogItem(logItem);
-}
-
-std::string SendLogReactor::generateHolder() {
- // TODO replace mock
- return generateRandomString();
-}
-
-std::string SendLogReactor::generateLogID() {
- // TODO replace mock
- return generateRandomString();
-}
-
-void SendLogReactor::initializePutReactor() {
- if (this->value.empty()) {
- throw std::runtime_error(
- "put reactor cannot be initialized with empty value");
- }
- if (this->hash.empty()) {
- throw std::runtime_error(
- "put reactor cannot be initialized with empty hash");
- }
- if (this->putReactor == nullptr) {
- this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->value, this->hash, &this->blobPutDoneCV);
- this->blobClient.put(this->putReactor);
- }
-}
-
-std::unique_ptr<grpc::Status>
-SendLogReactor::readRequest(backup::SendLogRequest request) {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- switch (this->state) {
- case State::USER_ID: {
- if (!request.has_userid()) {
- throw std::runtime_error("user id expected but not received");
- }
- this->userID = request.userid();
- this->state = State::BACKUP_ID;
- return nullptr;
- };
- case State::BACKUP_ID: {
- if (!request.has_backupid()) {
- throw std::runtime_error("backup id expected but not received");
- }
- this->backupID = request.backupid();
- this->state = State::LOG_HASH;
- return nullptr;
- };
- case State::LOG_HASH: {
- if (!request.has_loghash()) {
- throw std::runtime_error("log hash expected but not received");
- }
- this->hash = request.loghash();
- this->state = State::LOG_CHUNK;
- return nullptr;
- };
- case State::LOG_CHUNK: {
- if (!request.has_logdata()) {
- throw std::runtime_error("log data expected but not received");
- }
- std::unique_ptr<std::string> chunk =
- std::make_unique<std::string>(std::move(*request.mutable_logdata()));
- if (chunk->size() == 0) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
- // decide if keep in DB or upload to blob
- if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) {
- if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
- this->persistenceMethod = PersistenceMethod::DB;
- this->value = std::move(*chunk);
- this->storeInDatabase();
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- } else if (this->persistenceMethod == PersistenceMethod::BLOB) {
- this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
- } else {
- throw std::runtime_error(
- "error - invalid persistence state for chunk smaller than "
- "database limit");
- }
- } else {
- if (this->persistenceMethod != PersistenceMethod::UNKNOWN &&
- this->persistenceMethod != PersistenceMethod::BLOB) {
- throw std::runtime_error(
- "error - invalid persistence state, uploading to blob should be "
- "continued but it is not");
- }
- if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
- this->persistenceMethod = PersistenceMethod::BLOB;
- }
- if (this->value.empty()) {
- this->value = this->generateHolder();
- }
- this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
- }
-
- return nullptr;
- };
- }
- throw std::runtime_error("send log - invalid state");
-}
-
-void SendLogReactor::terminateCallback() {
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
-
- if (this->persistenceMethod == PersistenceMethod::DB ||
- this->putReactor == nullptr) {
- return;
- }
- this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
- std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
- if (!this->putReactor->isDone()) {
- this->blobPutDoneCV.wait(lockPut);
- } else if (!this->putReactor->getStatus().ok()) {
- throw std::runtime_error(this->putReactor->getStatus().error_message());
- }
- // store in db only when we successfully upload chunks
- this->storeInDatabase();
-}
-
-void SendLogReactor::doneCallback() {
- // we make sure that the blob client's state is flushed to the main memory
- // as there may be multiple threads from the pool taking over here
- const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- // TODO implement
- std::cout << "receive logs done " << this->status.error_code() << "/"
- << this->status.error_message() << std::endl;
-}
-
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.cpp
copy from services/backup/src/Reactors/server/SendLogReactor.h
copy to services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -1,72 +1,15 @@
-#pragma once
+#include "SendLogReactor.h"
#include "Constants.h"
-#include "ServerReadReactorBase.h"
-#include "ServiceBlobClient.h"
+#include "DatabaseManager.h"
#include "Tools.h"
-#include "../_generated/backup.grpc.pb.h"
-#include "../_generated/backup.pb.h"
-
#include <iostream>
-#include <memory>
-#include <string>
namespace comm {
namespace network {
namespace reactor {
-class SendLogReactor : public ServerReadReactorBase<
- backup::SendLogRequest,
- google::protobuf::Empty> {
- enum class State {
- USER_ID = 1,
- BACKUP_ID = 2,
- LOG_HASH = 3,
- LOG_CHUNK = 4,
- };
-
- enum class PersistenceMethod {
- UNKNOWN = 0,
- DB = 1,
- BLOB = 2,
- };
-
- State state = State::USER_ID;
- PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN;
- std::string userID;
- std::string backupID;
- std::string hash;
- // either the value itself which is a dump of a single operation (if
- // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is
- // true)
- std::string value;
- std::mutex reactorStateMutex;
-
- std::condition_variable blobPutDoneCV;
- std::mutex blobPutDoneCVMutex;
-
- std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
- ServiceBlobClient blobClient;
-
- void storeInDatabase();
- std::string generateHolder();
- std::string generateLogID();
- void initializePutReactor();
-
- void storeInBlob(const std::string &data) {
- }
-
-public:
- using ServerReadReactorBase<backup::SendLogRequest, google::protobuf::Empty>::
- ServerReadReactorBase;
-
- std::unique_ptr<grpc::Status>
- readRequest(backup::SendLogRequest request) override;
- void doneCallback() override;
- void terminateCallback() override;
-};
-
void SendLogReactor::storeInDatabase() {
// TODO handle attachment holders
database::LogItem logItem(

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 2:56 AM (20 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2592525
Default Alt Text
D3780.diff (29 KB)

Event Timeline