diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt --- a/services/backup/CMakeLists.txt +++ b/services/backup/CMakeLists.txt @@ -36,7 +36,12 @@ set(DEV_HEADERS_PATH "./dev") endif() -file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") +file(GLOB SOURCE_CODE + "./src/*.cpp" + "./src/**/*.cpp" + "./src/**/**/*.cpp" + "./src/**/**/**/*.cpp" +) list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$") foreach (ITEM ${DEV_SOURCE_CODE}) diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h @@ -7,6 +7,7 @@ #include #include +#include namespace comm { namespace network { @@ -25,25 +26,10 @@ BlobAppendHolderClientReactor( const std::string &holder, const std::string &hash, - std::condition_variable *terminationNotifier) - : terminationNotifier(terminationNotifier) { - this->request.set_holder(holder); - this->request.set_blobhash(hash); - } - - void OnDone(const grpc::Status &status) { - this->status = status; - this->done = true; - this->terminationNotifier->notify_one(); - } - - bool isDone() const { - return this->done; - } - - grpc::Status getStatus() const { - return this->status; - } + std::condition_variable *terminationNotifier); + void OnDone(const grpc::Status &status); + bool isDone() const; + grpc::Status getStatus() const; }; } // namespace reactor diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp new file mode 100644 --- /dev/null +++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp @@ -0,0 +1,32 @@ +#include "BlobAppendHolderClientReactor.h" + +namespace comm { +namespace network { +namespace reactor { + +BlobAppendHolderClientReactor::BlobAppendHolderClientReactor( + const std::string &holder, + const std::string &hash, + std::condition_variable *terminationNotifier) + : terminationNotifier(terminationNotifier) { + this->request.set_holder(holder); + this->request.set_blobhash(hash); +} + +void BlobAppendHolderClientReactor::OnDone(const grpc::Status &status) { + this->status = status; + this->done = true; + this->terminationNotifier->notify_one(); +} + +bool BlobAppendHolderClientReactor::isDone() const { + return this->done; +} + +grpc::Status BlobAppendHolderClientReactor::getStatus() const { + return this->status; +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -1,10 +1,10 @@ #pragma once -#include "ClientReadReactorBase.h" - #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "ClientReadReactorBase.h" + #include #include @@ -31,28 +31,6 @@ grpc::Status getStatus() const; }; -BlobGetClientReactor::BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks) - : holder(holder), dataChunks(dataChunks) { -} - -std::unique_ptr -BlobGetClientReactor::readResponse(blob::GetResponse &response) { - if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error("error reading data from the blob service"); - } - return nullptr; -} - -void BlobGetClientReactor::doneCallback() { - this->dataChunks->write(""); -} - -grpc::Status BlobGetClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp copy from services/backup/src/Reactors/client/blob/BlobGetClientReactor.h copy to services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -1,36 +1,10 @@ -#pragma once -#include "ClientReadReactorBase.h" - -#include "../_generated/blob.grpc.pb.h" -#include "../_generated/blob.pb.h" - -#include -#include - -#include -#include +#include "BlobGetClientReactor.h" namespace comm { namespace network { namespace reactor { -class BlobGetClientReactor - : public ClientReadReactorBase { - std::string holder; - std::shared_ptr> dataChunks; - -public: - BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks); - - std::unique_ptr - readResponse(blob::GetResponse &response) override; - void doneCallback() override; - grpc::Status getStatus() const; -}; - BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks) diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,16 +1,16 @@ #pragma once -#include "ClientBidiReactorBase.h" #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "ClientBidiReactorBase.h" + #include #include #include -#include #include #include @@ -51,62 +51,6 @@ bool getDataExists() const; }; -BlobPutClientReactor::BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier) - : holder(holder), - hash(hash), - dataChunks(folly::MPMCQueue(100)), - terminationNotifier(terminationNotifier) { -} - -void BlobPutClientReactor::scheduleSendingDataChunk( - std::unique_ptr dataChunk) { - if (!this->dataChunks.write(std::move(*dataChunk))) { - throw std::runtime_error( - "Error scheduling sending a data chunk to send to the blob service"); - } -} - -std::unique_ptr BlobPutClientReactor::prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) { - if (this->state == State::SEND_HOLDER) { - this->request.set_holder(this->holder); - this->state = State::SEND_HASH; - return nullptr; - } - if (this->state == State::SEND_HASH) { - request.set_blobhash(this->hash); - this->state = State::SEND_CHUNKS; - return nullptr; - } - if (previousResponse->dataexists()) { - this->dataExists = true; - return std::make_unique(grpc::Status::OK); - } - std::string dataChunk; - this->dataChunks.blockingRead(dataChunk); - if (dataChunk.empty()) { - return std::make_unique(grpc::Status::OK); - } - request.set_datachunk(dataChunk); - return nullptr; -} - -void BlobPutClientReactor::doneCallback() { - this->terminationNotifier->notify_one(); -} - -grpc::Status BlobPutClientReactor::getStatus() const { - return this->status; -} - -bool BlobPutClientReactor::getDataExists() const { - return this->dataExists; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp copy from services/backup/src/Reactors/client/blob/BlobPutClientReactor.h copy to services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -1,56 +1,11 @@ -#pragma once +#include "BlobPutClientReactor.h" -#include "ClientBidiReactorBase.h" -#include "Constants.h" - -#include "../_generated/blob.grpc.pb.h" -#include "../_generated/blob.pb.h" - -#include -#include - -#include #include -#include -#include namespace comm { namespace network { namespace reactor { -class BlobPutClientReactor - : public ClientBidiReactorBase { - - enum class State { - SEND_HOLDER = 0, - SEND_HASH = 1, - SEND_CHUNKS = 2, - }; - - State state = State::SEND_HOLDER; - const std::string hash; - const std::string holder; - size_t currentDataSize = 0; - const size_t chunkSize = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - folly::MPMCQueue dataChunks; - std::condition_variable *terminationNotifier; - bool dataExists = false; - -public: - BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier); - void scheduleSendingDataChunk(std::unique_ptr dataChunk); - std::unique_ptr prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) override; - void doneCallback() override; - grpc::Status getStatus() const; - bool getDataExists() const; -}; - BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash, diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h @@ -1,13 +1,12 @@ #pragma once -#include "DatabaseManager.h" -#include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" -#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include "ServerBidiReactorBase.h" + #include #include #include @@ -55,119 +54,6 @@ void terminateCallback() override; }; -std::string CreateNewBackupReactor::generateBackupID() { - // mock - return generateRandomString(); -} - -void CreateNewBackupReactor::initializeHolderReactor() { - if (this->holder.empty()) { - throw std::runtime_error( - "holder reactor cannot be initialized with empty holder"); - } - if (this->dataHash.empty()) { - throw std::runtime_error( - "holder reactor cannot be initialized with empty hash"); - } - if (this->holderReactor == nullptr) { - this->holderReactor = - std::make_shared( - this->holder, this->dataHash, &this->blobAppendHolderDoneCV); - this->blobClient.appendHolder(this->holderReactor); - } -} - -std::unique_ptr CreateNewBackupReactor::handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::KEY_ENTROPY; - return nullptr; - } - case State::KEY_ENTROPY: { - if (!request.has_keyentropy()) { - throw std::runtime_error( - "backup key entropy expected but not received"); - } - this->keyEntropy = request.keyentropy(); - this->state = State::DATA_HASH; - return nullptr; - } - case State::DATA_HASH: { - if (!request.has_newcompactionhash()) { - throw std::runtime_error("data hash expected but not received"); - } - this->dataHash = request.newcompactionhash(); - this->state = State::DATA_CHUNKS; - - // TODO confirm - holder may be a backup id - this->backupID = this->generateBackupID(); - response->set_backupid(this->backupID); - this->holder = this->backupID; - this->putReactor = std::make_shared( - this->holder, this->dataHash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - return nullptr; - } - case State::DATA_CHUNKS: { - this->putReactor->scheduleSendingDataChunk(std::make_unique( - std::move(*request.mutable_newcompactionchunk()))); - return nullptr; - } - } - throw std::runtime_error("new backup - invalid state"); -} - -void CreateNewBackupReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - if (this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->isDone()) { - if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); - } - } else { - this->blobPutDoneCV.wait(lock2); - } - if (this->putReactor->getDataExists()) { - this->initializeHolderReactor(); - std::unique_lock lockHolder(this->blobAppendHolderDoneCVMutex); - if (this->holderReactor->isDone()) { - if (!this->holderReactor->getStatus().ok()) { - throw std::runtime_error( - this->holderReactor->getStatus().error_message()); - } - } else { - this->blobAppendHolderDoneCV.wait(lockHolder); - } - } - try { - // TODO add recovery data - // TODO handle attachments holders - database::BackupItem backupItem( - this->userID, - this->backupID, - getCurrentTimestamp(), - generateRandomString(), - this->holder, - {}); - database::DatabaseManager::getInstance().putBackupItem(backupItem); - } catch (std::runtime_error &e) { - std::cout << "db operations error: " << e.what() << std::endl; - } -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp copy from services/backup/src/Reactors/server/CreateNewBackupReactor.h copy to services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,60 +1,12 @@ -#pragma once +#include "CreateNewBackupReactor.h" #include "DatabaseManager.h" -#include "ServerBidiReactorBase.h" -#include "ServiceBlobClient.h" #include "Tools.h" -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - -#include -#include -#include -#include - namespace comm { namespace network { namespace reactor { -class CreateNewBackupReactor : public ServerBidiReactorBase< - backup::CreateNewBackupRequest, - backup::CreateNewBackupResponse> { - enum class State { - USER_ID = 1, - KEY_ENTROPY = 2, - DATA_HASH = 3, - DATA_CHUNKS = 4, - }; - - State state = State::USER_ID; - std::string userID; - std::string keyEntropy; - std::string dataHash; - std::string holder; - std::string backupID; - std::shared_ptr putReactor; - std::shared_ptr holderReactor; - - ServiceBlobClient blobClient; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::condition_variable blobAppendHolderDoneCV; - std::mutex blobAppendHolderDoneCVMutex; - - std::string generateBackupID(); - void initializeHolderReactor(); - -public: - std::unique_ptr handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) override; - void terminateCallback() override; -}; - std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -2,16 +2,15 @@ #include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" -#include "DatabaseManager.h" -#include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include "ServerWriteReactorBase.h" + #include -#include #include #include #include @@ -51,118 +50,6 @@ void terminateCallback() override; }; -PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) - : ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse>(request), - dataChunks(std::make_shared>(100)) { -} - -void PullBackupReactor::initializeGetReactor(const std::string &holder) { - if (this->backupItem == nullptr) { - throw std::runtime_error( - "get reactor cannot be initialized when backup item is missing"); - } - this->getReactor.reset( - new reactor::BlobGetClientReactor(holder, this->dataChunks)); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); -} - -void PullBackupReactor::initialize() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->request.userid().empty()) { - throw std::runtime_error("no user id provided"); - } - if (this->request.backupid().empty()) { - throw std::runtime_error("no backup id provided"); - } - this->backupItem = database::DatabaseManager::getInstance().findBackupItem( - this->request.userid(), this->request.backupid()); - if (this->backupItem == nullptr) { - throw std::runtime_error( - "no backup found for provided parameters: user id [" + - this->request.userid() + "], backup id [" + this->request.backupid() + - "]"); - } - this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( - this->request.backupid()); -} - -std::unique_ptr -PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->state == State::COMPACTION) { - if (this->getReactor == nullptr) { - this->initializeGetReactor(this->backupItem->getCompactionHolder()); - } - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!dataChunk.empty()) { - response->set_compactionchunk(dataChunk); - return nullptr; - } else { - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading compaction"); - } - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } - this->state = State::LOGS; - } - } - if (this->state == State::LOGS) { - // TODO make sure logs are received in correct order regardless their size - if (this->logs.empty()) { - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex == this->logs.size()) { - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading logs"); - } - return std::make_unique(grpc::Status::OK); - } else if (this->currentLogIndex > this->logs.size()) { - throw std::runtime_error("log index out of bound"); - } - if (this->currentLog == nullptr) { - this->currentLog = this->logs.at(this->currentLogIndex); - if (this->currentLog->getPersistedInBlob()) { - this->initializeGetReactor(this->currentLog->getValue()); - } else { - response->set_logchunk(this->currentLog->getValue()); - ++this->currentLogIndex; - this->currentLog = nullptr; - return nullptr; - } - } - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } - if (dataChunk.empty()) { - ++this->currentLogIndex; - this->currentLog = nullptr; - } else { - response->set_logchunk(dataChunk); - } - return nullptr; - } - throw std::runtime_error("unhandled state"); -} - -void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.cpp copy from services/backup/src/Reactors/server/PullBackupReactor.h copy to services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,56 +1,13 @@ -#pragma once +#include "PullBackupReactor.h" -#include "BlobGetClientReactor.h" -#include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" -#include "ServerWriteReactorBase.h" -#include "ServiceBlobClient.h" - -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - -#include #include -#include -#include -#include namespace comm { namespace network { namespace reactor { -class PullBackupReactor : public ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse> { - - enum class State { - COMPACTION = 1, - LOGS = 2, - }; - - std::shared_ptr backupItem; - std::shared_ptr getReactor; - std::mutex reactorStateMutex; - std::shared_ptr> dataChunks; - ServiceBlobClient blobClient; - State state = State::COMPACTION; - std::vector> logs; - size_t currentLogIndex = 0; - std::shared_ptr currentLog; - - void initializeGetReactor(const std::string &holder); - -public: - PullBackupReactor(const backup::PullBackupRequest *request); - - void initialize() override; - - std::unique_ptr - writeResponse(backup::PullBackupResponse *response) override; - void terminateCallback() override; -}; - PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, @@ -116,44 +73,43 @@ this->state = State::LOGS; } } - if (this->state == State::LOGS) { - // TODO make sure logs are received in correct order regardless their size - if (this->logs.empty()) { - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex == this->logs.size()) { - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading logs"); - } - return std::make_unique(grpc::Status::OK); - } else if (this->currentLogIndex > this->logs.size()) { - throw std::runtime_error("log index out of bound"); - } - if (this->currentLog == nullptr) { - this->currentLog = this->logs.at(this->currentLogIndex); - if (this->currentLog->getPersistedInBlob()) { - this->initializeGetReactor(this->currentLog->getValue()); - } else { - response->set_logchunk(this->currentLog->getValue()); - ++this->currentLogIndex; - this->currentLog = nullptr; - return nullptr; - } - } - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (this->state == State::LOGS) { + // TODO make sure logs are received in correct order regardless their size + if (this->logs.empty()) { + return std::make_unique(grpc::Status::OK); + } + if (this->currentLogIndex == this->logs.size()) { + if (!this->dataChunks->isEmpty()) { + throw std::runtime_error("dangling data discovered after reading logs"); } - if (dataChunk.empty()) { + return std::make_unique(grpc::Status::OK); + } else if (this->currentLogIndex > this->logs.size()) { + throw std::runtime_error("log index out of bound"); + } + if (this->currentLog == nullptr) { + this->currentLog = this->logs.at(this->currentLogIndex); + if (this->currentLog->getPersistedInBlob()) { + this->initializeGetReactor(this->currentLog->getValue()); + } else { + response->set_logchunk(this->currentLog->getValue()); ++this->currentLogIndex; this->currentLog = nullptr; - } else { - response->set_logchunk(dataChunk); + return nullptr; } - return nullptr; } + std::string dataChunk; + this->dataChunks->blockingRead(dataChunk); + if (!this->getReactor->getStatus().ok()) { + throw std::runtime_error(this->getReactor->getStatus().error_message()); + } + if (dataChunk.empty()) { + ++this->currentLogIndex; + this->currentLog = nullptr; + } else { + response->set_logchunk(dataChunk); + } + return nullptr; + } throw std::runtime_error("unhandled state"); } diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,14 +1,11 @@ #pragma once -#include "Constants.h" #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" -#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" -#include #include #include @@ -59,9 +56,6 @@ void initializePutReactor(); void initializeHolderReactor(); - void storeInBlob(const std::string &data) { - } - public: using ServerReadReactorBase:: ServerReadReactorBase; @@ -72,178 +66,6 @@ void terminateCallback() override; }; -void SendLogReactor::storeInDatabase() { - // TODO handle attachment holders - database::LogItem logItem( - this->backupID, - this->generateLogID(), - (this->persistenceMethod == PersistenceMethod::BLOB), - this->value, - {}); - database::DatabaseManager::getInstance().putLogItem(logItem); -} - -std::string SendLogReactor::generateHolder() { - // TODO replace mock - return generateRandomString(); -} - -std::string SendLogReactor::generateLogID() { - // TODO replace mock - return generateRandomString(); -} - -void SendLogReactor::initializePutReactor() { - if (this->value.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty value"); - } - if (this->hash.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty hash"); - } - if (this->putReactor == nullptr) { - this->putReactor = std::make_shared( - this->value, this->hash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - } -} - -void SendLogReactor::initializeHolderReactor() { - if (this->value.empty()) { - throw std::runtime_error( - "holder reactor cannot be initialized with empty value"); - } - if (this->hash.empty()) { - throw std::runtime_error( - "holder reactor cannot be initialized with empty hash"); - } - if (this->holderReactor == nullptr) { - this->holderReactor = - std::make_shared( - this->value, this->hash, &this->blobAppendHolderDoneCV); - this->blobClient.appendHolder(this->holderReactor); - } -} - -std::unique_ptr -SendLogReactor::readRequest(backup::SendLogRequest request) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::BACKUP_ID; - return nullptr; - }; - case State::BACKUP_ID: { - if (!request.has_backupid()) { - throw std::runtime_error("backup id expected but not received"); - } - this->backupID = request.backupid(); - this->state = State::LOG_HASH; - return nullptr; - }; - case State::LOG_HASH: { - if (!request.has_loghash()) { - throw std::runtime_error("log hash expected but not received"); - } - this->hash = request.loghash(); - this->state = State::LOG_CHUNK; - return nullptr; - }; - case State::LOG_CHUNK: { - if (!request.has_logdata()) { - throw std::runtime_error("log data expected but not received"); - } - std::unique_ptr chunk = - std::make_unique(std::move(*request.mutable_logdata())); - if (chunk->size() == 0) { - return std::make_unique(grpc::Status::OK); - } - // decide if keep in DB or upload to blob - if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::DB; - this->value = std::move(*chunk); - this->storeInDatabase(); - return std::make_unique(grpc::Status::OK); - } else if (this->persistenceMethod == PersistenceMethod::BLOB) { - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } else { - throw std::runtime_error( - "error - invalid persistence state for chunk smaller than " - "database limit"); - } - } else { - if (this->persistenceMethod != PersistenceMethod::UNKNOWN && - this->persistenceMethod != PersistenceMethod::BLOB) { - throw std::runtime_error( - "error - invalid persistence state, uploading to blob should be " - "continued but it is not"); - } - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::BLOB; - } - if (this->value.empty()) { - this->value = this->generateHolder(); - } - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } - - return nullptr; - }; - } - throw std::runtime_error("send log - invalid state"); -} - -void SendLogReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - - if (this->persistenceMethod == PersistenceMethod::DB || - this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (this->putReactor->isDone()) { - if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); - } - } else { - this->blobPutDoneCV.wait(lockPut); - } - if (this->putReactor->getDataExists()) { - this->initializeHolderReactor(); - std::unique_lock lockHolder(this->blobAppendHolderDoneCVMutex); - if (this->holderReactor->isDone()) { - if (!this->holderReactor->getStatus().ok()) { - throw std::runtime_error( - this->holderReactor->getStatus().error_message()); - } - } else { - this->blobAppendHolderDoneCV.wait(lockHolder); - } - } - // store in db only when we successfully upload chunks - this->storeInDatabase(); -} - -void SendLogReactor::doneCallback() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - // TODO implement - std::cout << "receive logs done " << this->status.error_code() << "/" - << this->status.error_message() << std::endl; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.cpp copy from services/backup/src/Reactors/server/SendLogReactor.h copy to services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,77 +1,15 @@ -#pragma once +#include "SendLogReactor.h" +#include "DatabaseManager.h" #include "Constants.h" -#include "ServerReadReactorBase.h" -#include "ServiceBlobClient.h" #include "Tools.h" -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - #include -#include -#include namespace comm { namespace network { namespace reactor { -class SendLogReactor : public ServerReadReactorBase< - backup::SendLogRequest, - google::protobuf::Empty> { - enum class State { - USER_ID = 1, - BACKUP_ID = 2, - LOG_HASH = 3, - LOG_CHUNK = 4, - }; - - enum class PersistenceMethod { - UNKNOWN = 0, - DB = 1, - BLOB = 2, - }; - - State state = State::USER_ID; - PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; - std::string userID; - std::string backupID; - std::string hash; - // either the value itself which is a dump of a single operation (if - // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is - // true) - std::string value; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::condition_variable blobAppendHolderDoneCV; - std::mutex blobAppendHolderDoneCVMutex; - - std::shared_ptr putReactor; - std::shared_ptr holderReactor; - ServiceBlobClient blobClient; - - void storeInDatabase(); - std::string generateHolder(); - std::string generateLogID(); - void initializePutReactor(); - void initializeHolderReactor(); - - void storeInBlob(const std::string &data) { - } - -public: - using ServerReadReactorBase:: - ServerReadReactorBase; - - std::unique_ptr - readRequest(backup::SendLogRequest request) override; - void doneCallback() override; - void terminateCallback() override; -}; - void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem(