diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt --- a/services/backup/CMakeLists.txt +++ b/services/backup/CMakeLists.txt @@ -36,7 +36,7 @@ set(DEV_HEADERS_PATH "./dev") endif() -file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") +file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$") foreach (ITEM ${DEV_SOURCE_CODE}) diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -1,10 +1,10 @@ #pragma once -#include "ClientReadReactorBase.h" - #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "ClientReadReactorBase.h" + #include #include @@ -31,28 +31,6 @@ grpc::Status getStatus() const; }; -BlobGetClientReactor::BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks) - : holder(holder), dataChunks(dataChunks) { -} - -std::unique_ptr -BlobGetClientReactor::readResponse(blob::GetResponse &response) { - if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error("error reading data from the blob service"); - } - return nullptr; -} - -void BlobGetClientReactor::doneCallback() { - this->dataChunks->write(""); -} - -grpc::Status BlobGetClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp copy from services/backup/src/Reactors/client/blob/BlobGetClientReactor.h copy to services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -1,36 +1,10 @@ -#pragma once -#include "ClientReadReactorBase.h" - -#include "../_generated/blob.grpc.pb.h" -#include "../_generated/blob.pb.h" - -#include -#include - -#include -#include +#include "BlobGetClientReactor.h" namespace comm { namespace network { namespace reactor { -class BlobGetClientReactor - : public ClientReadReactorBase { - std::string holder; - std::shared_ptr> dataChunks; - -public: - BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks); - - std::unique_ptr - readResponse(blob::GetResponse &response) override; - void doneCallback() override; - grpc::Status getStatus() const; -}; - BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks) diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,16 +1,16 @@ #pragma once -#include "ClientBidiReactorBase.h" #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "ClientBidiReactorBase.h" + #include #include #include -#include #include #include @@ -49,57 +49,6 @@ grpc::Status getStatus() const; }; -BlobPutClientReactor::BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier) - : holder(holder), - hash(hash), - dataChunks(folly::MPMCQueue(100)), - terminationNotifier(terminationNotifier) { -} - -void BlobPutClientReactor::scheduleSendingDataChunk( - std::unique_ptr dataChunk) { - if (!this->dataChunks.write(std::move(*dataChunk))) { - throw std::runtime_error( - "Error scheduling sending a data chunk to send to the blob service"); - } -} - -std::unique_ptr BlobPutClientReactor::prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) { - if (this->state == State::SEND_HOLDER) { - this->request.set_holder(this->holder); - this->state = State::SEND_HASH; - return nullptr; - } - if (this->state == State::SEND_HASH) { - request.set_blobhash(this->hash); - this->state = State::SEND_CHUNKS; - return nullptr; - } - if (previousResponse->dataexists()) { - return std::make_unique(grpc::Status::OK); - } - std::string dataChunk; - this->dataChunks.blockingRead(dataChunk); - if (dataChunk.empty()) { - return std::make_unique(grpc::Status::OK); - } - request.set_datachunk(dataChunk); - return nullptr; -} - -void BlobPutClientReactor::doneCallback() { - this->terminationNotifier->notify_one(); -} - -grpc::Status BlobPutClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp copy from services/backup/src/Reactors/client/blob/BlobPutClientReactor.h copy to services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -1,54 +1,11 @@ -#pragma once +#include "BlobPutClientReactor.h" -#include "ClientBidiReactorBase.h" -#include "Constants.h" - -#include "../_generated/blob.grpc.pb.h" -#include "../_generated/blob.pb.h" - -#include -#include - -#include #include -#include -#include namespace comm { namespace network { namespace reactor { -class BlobPutClientReactor - : public ClientBidiReactorBase { - - enum class State { - SEND_HOLDER = 0, - SEND_HASH = 1, - SEND_CHUNKS = 2, - }; - - State state = State::SEND_HOLDER; - const std::string hash; - const std::string holder; - size_t currentDataSize = 0; - const size_t chunkSize = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - folly::MPMCQueue dataChunks; - std::condition_variable *terminationNotifier; - -public: - BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier); - void scheduleSendingDataChunk(std::unique_ptr dataChunk); - std::unique_ptr prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) override; - void doneCallback() override; - grpc::Status getStatus() const; -}; - BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash, diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h @@ -1,13 +1,12 @@ #pragma once -#include "DatabaseManager.h" -#include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" -#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include "ServerBidiReactorBase.h" + #include #include #include @@ -50,89 +49,6 @@ void terminateCallback() override; }; -std::string CreateNewBackupReactor::generateBackupID() { - // mock - return generateRandomString(); -} - -std::unique_ptr CreateNewBackupReactor::handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::KEY_ENTROPY; - return nullptr; - } - case State::KEY_ENTROPY: { - if (!request.has_keyentropy()) { - throw std::runtime_error( - "backup key entropy expected but not received"); - } - this->keyEntropy = request.keyentropy(); - this->state = State::DATA_HASH; - return nullptr; - } - case State::DATA_HASH: { - if (!request.has_newcompactionhash()) { - throw std::runtime_error("data hash expected but not received"); - } - this->dataHash = request.newcompactionhash(); - this->state = State::DATA_CHUNKS; - - // TODO confirm - holder may be a backup id - this->backupID = this->generateBackupID(); - response->set_backupid(this->backupID); - this->holder = this->backupID; - this->putReactor = std::make_shared( - this->holder, this->dataHash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - return nullptr; - } - case State::DATA_CHUNKS: { - this->putReactor->scheduleSendingDataChunk(std::make_unique( - std::move(*request.mutable_newcompactionchunk()))); - return nullptr; - } - } - throw std::runtime_error("new backup - invalid state"); -} - -void CreateNewBackupReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - if (this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->isDone() && !this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); - } - if (!this->putReactor->isDone()) { - this->blobPutDoneCV.wait(lock2); - } - try { - // TODO add recovery data - // TODO handle attachments holders - database::BackupItem backupItem( - this->userID, - this->backupID, - getCurrentTimestamp(), - generateRandomString(), - this->holder, - {}); - database::DatabaseManager::getInstance().putBackupItem(backupItem); - } catch (std::runtime_error &e) { - std::cout << "db operations error: " << e.what() << std::endl; - } -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp copy from services/backup/src/Reactors/server/CreateNewBackupReactor.h copy to services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,55 +1,12 @@ -#pragma once +#include "CreateNewBackupReactor.h" #include "DatabaseManager.h" -#include "ServerBidiReactorBase.h" -#include "ServiceBlobClient.h" #include "Tools.h" -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - -#include -#include -#include -#include - namespace comm { namespace network { namespace reactor { -class CreateNewBackupReactor : public ServerBidiReactorBase< - backup::CreateNewBackupRequest, - backup::CreateNewBackupResponse> { - enum class State { - USER_ID = 1, - KEY_ENTROPY = 2, - DATA_HASH = 3, - DATA_CHUNKS = 4, - }; - - State state = State::USER_ID; - std::string userID; - std::string keyEntropy; - std::string dataHash; - std::string holder; - std::string backupID; - std::shared_ptr putReactor; - - ServiceBlobClient blobClient; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::string generateBackupID(); - -public: - std::unique_ptr handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) override; - void terminateCallback() override; -}; - std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -2,16 +2,15 @@ #include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" -#include "DatabaseManager.h" -#include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include "ServerWriteReactorBase.h" + #include -#include #include #include #include @@ -51,138 +50,6 @@ void terminateCallback() override; }; -PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) - : ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse>(request), - dataChunks(std::make_shared>(100)) { -} - -void PullBackupReactor::initializeGetReactor(const std::string &holder) { - if (this->backupItem == nullptr) { - throw std::runtime_error( - "get reactor cannot be initialized when backup item is missing"); - } - this->getReactor.reset( - new reactor::BlobGetClientReactor(holder, this->dataChunks)); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); -} - -void PullBackupReactor::initialize() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->request.userid().empty()) { - throw std::runtime_error("no user id provided"); - } - if (this->request.backupid().empty()) { - throw std::runtime_error("no backup id provided"); - } - this->backupItem = database::DatabaseManager::getInstance().findBackupItem( - this->request.userid(), this->request.backupid()); - if (this->backupItem == nullptr) { - throw std::runtime_error( - "no backup found for provided parameters: user id [" + - this->request.userid() + "], backup id [" + this->request.backupid() + - "]"); - } - this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( - this->request.backupid()); -} - -std::unique_ptr -PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->state == State::COMPACTION) { - if (this->getReactor == nullptr) { - this->initializeGetReactor(this->backupItem->getCompactionHolder()); - } - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!dataChunk.empty()) { - response->set_compactionchunk(dataChunk); - return nullptr; - } - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading compaction"); - } - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } - this->state = State::LOGS; - } - if (this->state == State::LOGS) { - // TODO make sure logs are received in correct order regardless their size - if (this->logs.empty()) { - // this means that there are no logs at all so we just terminate with the - // compaction - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex == this->logs.size()) { - // we reached the end of the logs collection so we just want to terminate - // either we terminate with an error if we have some dangling data - // or with success if we don't - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error("dangling data discovered after reading logs"); - } - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex > this->logs.size()) { - // we went out of the scope of the logs collection, this should never - // happen and should be perceived as an error - throw std::runtime_error("log index out of bound"); - } - // this means that we're not reading anything between invocations of - // writeResponse - // it is only not null when we read data in chunks - if (this->currentLog == nullptr) { - this->currentLog = this->logs.at(this->currentLogIndex); - if (this->currentLog->getPersistedInBlob()) { - // if the item is stored in the blob, we initialize the get reactor and - // proceed - this->initializeGetReactor(this->currentLog->getValue()); - } else { - // if the item is persisted in the database, we just take it, send the - // data to the client and reset currentLog so the next invocation of - // writeResponse will take another one from the collection - response->set_logchunk(this->currentLog->getValue()); - ++this->currentLogIndex; - this->currentLog = nullptr; - return nullptr; - } - } - // we want to read the chunks from the blob through the get client until we - // get an empty chunk - a sign of "end of chunks" - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } - // if we get an empty chunk, we reset the currentLog so we can read the next - // one from the logs collection. - // If there's data inside, we write it to the client and proceed. - if (dataChunk.empty()) { - ++this->currentLogIndex; - this->currentLog = nullptr; - return nullptr; - } else { - response->set_logchunk(dataChunk); - } - return nullptr; - } - throw std::runtime_error("unhandled state"); -} - -void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.cpp copy from services/backup/src/Reactors/server/PullBackupReactor.h copy to services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,56 +1,13 @@ -#pragma once +#include "PullBackupReactor.h" -#include "BlobGetClientReactor.h" -#include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" -#include "ServerWriteReactorBase.h" -#include "ServiceBlobClient.h" - -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - -#include #include -#include -#include -#include namespace comm { namespace network { namespace reactor { -class PullBackupReactor : public ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse> { - - enum class State { - COMPACTION = 1, - LOGS = 2, - }; - - std::shared_ptr backupItem; - std::shared_ptr getReactor; - std::mutex reactorStateMutex; - std::shared_ptr> dataChunks; - ServiceBlobClient blobClient; - State state = State::COMPACTION; - std::vector> logs; - size_t currentLogIndex = 0; - std::shared_ptr currentLog; - - void initializeGetReactor(const std::string &holder); - -public: - PullBackupReactor(const backup::PullBackupRequest *request); - - void initialize() override; - - std::unique_ptr - writeResponse(backup::PullBackupResponse *response) override; - void terminateCallback() override; -}; - PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,14 +1,11 @@ #pragma once -#include "Constants.h" #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" -#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" -#include #include #include @@ -54,9 +51,6 @@ std::string generateLogID(); void initializePutReactor(); - void storeInBlob(const std::string &data) { - } - public: using ServerReadReactorBase:: ServerReadReactorBase; @@ -67,147 +61,6 @@ void terminateCallback() override; }; -void SendLogReactor::storeInDatabase() { - // TODO handle attachment holders - database::LogItem logItem( - this->backupID, - this->generateLogID(), - (this->persistenceMethod == PersistenceMethod::BLOB), - this->value, - {}); - database::DatabaseManager::getInstance().putLogItem(logItem); -} - -std::string SendLogReactor::generateHolder() { - // TODO replace mock - return generateRandomString(); -} - -std::string SendLogReactor::generateLogID() { - // TODO replace mock - return generateRandomString(); -} - -void SendLogReactor::initializePutReactor() { - if (this->value.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty value"); - } - if (this->hash.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty hash"); - } - if (this->putReactor == nullptr) { - this->putReactor = std::make_shared( - this->value, this->hash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - } -} - -std::unique_ptr -SendLogReactor::readRequest(backup::SendLogRequest request) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::BACKUP_ID; - return nullptr; - }; - case State::BACKUP_ID: { - if (!request.has_backupid()) { - throw std::runtime_error("backup id expected but not received"); - } - this->backupID = request.backupid(); - this->state = State::LOG_HASH; - return nullptr; - }; - case State::LOG_HASH: { - if (!request.has_loghash()) { - throw std::runtime_error("log hash expected but not received"); - } - this->hash = request.loghash(); - this->state = State::LOG_CHUNK; - return nullptr; - }; - case State::LOG_CHUNK: { - if (!request.has_logdata()) { - throw std::runtime_error("log data expected but not received"); - } - std::unique_ptr chunk = - std::make_unique(std::move(*request.mutable_logdata())); - if (chunk->size() == 0) { - return std::make_unique(grpc::Status::OK); - } - // decide if keep in DB or upload to blob - if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::DB; - this->value = std::move(*chunk); - this->storeInDatabase(); - return std::make_unique(grpc::Status::OK); - } else if (this->persistenceMethod == PersistenceMethod::BLOB) { - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } else { - throw std::runtime_error( - "error - invalid persistence state for chunk smaller than " - "database limit"); - } - } else { - if (this->persistenceMethod != PersistenceMethod::UNKNOWN && - this->persistenceMethod != PersistenceMethod::BLOB) { - throw std::runtime_error( - "error - invalid persistence state, uploading to blob should be " - "continued but it is not"); - } - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::BLOB; - } - if (this->value.empty()) { - this->value = this->generateHolder(); - } - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } - - return nullptr; - }; - } - throw std::runtime_error("send log - invalid state"); -} - -void SendLogReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - - if (this->persistenceMethod == PersistenceMethod::DB || - this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { - this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); - } - // store in db only when we successfully upload chunks - this->storeInDatabase(); -} - -void SendLogReactor::doneCallback() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - // TODO implement - std::cout << "receive logs done " << this->status.error_code() << "/" - << this->status.error_message() << std::endl; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.cpp copy from services/backup/src/Reactors/server/SendLogReactor.h copy to services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,72 +1,15 @@ -#pragma once +#include "SendLogReactor.h" #include "Constants.h" -#include "ServerReadReactorBase.h" -#include "ServiceBlobClient.h" +#include "DatabaseManager.h" #include "Tools.h" -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - #include -#include -#include namespace comm { namespace network { namespace reactor { -class SendLogReactor : public ServerReadReactorBase< - backup::SendLogRequest, - google::protobuf::Empty> { - enum class State { - USER_ID = 1, - BACKUP_ID = 2, - LOG_HASH = 3, - LOG_CHUNK = 4, - }; - - enum class PersistenceMethod { - UNKNOWN = 0, - DB = 1, - BLOB = 2, - }; - - State state = State::USER_ID; - PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; - std::string userID; - std::string backupID; - std::string hash; - // either the value itself which is a dump of a single operation (if - // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is - // true) - std::string value; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::shared_ptr putReactor; - ServiceBlobClient blobClient; - - void storeInDatabase(); - std::string generateHolder(); - std::string generateLogID(); - void initializePutReactor(); - - void storeInBlob(const std::string &data) { - } - -public: - using ServerReadReactorBase:: - ServerReadReactorBase; - - std::unique_ptr - readRequest(backup::SendLogRequest request) override; - void doneCallback() override; - void terminateCallback() override; -}; - void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem(