diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt index b46e3e2a2..0e2658795 100644 --- a/services/backup/CMakeLists.txt +++ b/services/backup/CMakeLists.txt @@ -1,131 +1,131 @@ PROJECT(backup C CXX) cmake_minimum_required(VERSION 3.16) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) if(COMMAND cmake_policy) cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) set(CMAKE_CXX_STANDARD 17) set(BUILD_TESTING OFF CACHE BOOL "Turn off tests" FORCE) set(WITH_GTEST "Use Google Test" OFF) # FIND LIBS include(./cmake-components/grpc.cmake) include(./cmake-components/folly.cmake) add_subdirectory(./lib/glog) find_package(AWSSDK REQUIRED COMPONENTS core dynamodb) find_package(Boost 1.40 COMPONENTS program_options REQUIRED) # FIND FILES file(GLOB DOUBLE_CONVERSION_SOURCES "./lib/double-conversion/double-conversion/*.cc") if ($ENV{COMM_TEST_SERVICES} MATCHES 1) add_compile_definitions(COMM_TEST_SERVICES) endif() file(GLOB GENERATED_CODE "./_generated/*.cc") set(DEV_SOURCE_CODE "") set(DEV_HEADERS_PATH "") if ($ENV{COMM_SERVICES_DEV_MODE} MATCHES 1) add_compile_definitions(COMM_SERVICES_DEV_MODE) file(GLOB DEV_SOURCE_CODE "./dev/*.cpp" "./src/*.dev.cpp") set(DEV_HEADERS_PATH "./dev") endif() -file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") +file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$") foreach (ITEM ${DEV_SOURCE_CODE}) string(REPLACE "/" ";" SPLIT_ITEM ${ITEM}) list(GET SPLIT_ITEM -1 FILE_FULL_NAME) string(REPLACE ".dev.cpp" ".cpp" FILE_NAME ${FILE_FULL_NAME}) list(FILTER SOURCE_CODE EXCLUDE REGEX ".*${FILE_NAME}$") list(APPEND SOURCE_CODE "${ITEM}") endforeach() include_directories( ./src ./src/grpc-client ./src/DatabaseEntities ./src/Reactors ./src/Reactors/server ./src/Reactors/server/base-reactors ./src/Reactors/client ./src/Reactors/client/blob ./src/Reactors/client/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion ${Boost_INCLUDE_DIR} ${DEV_HEADERS_PATH} ) # SERVER add_executable( backup ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ) set( LIBS ${GRPC_LIBS} ${AWSSDK_LINK_LIBRARIES} ${Boost_LIBRARIES} glog::glog ) target_link_libraries( backup ${LIBS} ) install( TARGETS backup RUNTIME DESTINATION bin/ ) # TEST if ($ENV{COMM_TEST_SERVICES} MATCHES 1) file(GLOB TEST_CODE "./test/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") enable_testing() find_package(GTest REQUIRED) include_directories( ${GTEST_INCLUDE_DIR} ./test ) add_executable( runTests ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ${TEST_CODE} ) target_link_libraries( runTests ${LIBS} gtest gtest_main ) add_test( NAME runTests COMMAND runTests ) endif() diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp similarity index 51% copy from services/backup/src/Reactors/client/blob/BlobGetClientReactor.h copy to services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp index e15fbc3ea..be85c41da 100644 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -1,58 +1,32 @@ -#pragma once -#include "ClientReadReactorBase.h" - -#include "../_generated/blob.grpc.pb.h" -#include "../_generated/blob.pb.h" - -#include -#include - -#include -#include +#include "BlobGetClientReactor.h" namespace comm { namespace network { namespace reactor { -class BlobGetClientReactor - : public ClientReadReactorBase { - std::string holder; - std::shared_ptr> dataChunks; - -public: - BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks); - - std::unique_ptr - readResponse(blob::GetResponse &response) override; - void doneCallback() override; - grpc::Status getStatus() const; -}; - BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks) : holder(holder), dataChunks(dataChunks) { } std::unique_ptr BlobGetClientReactor::readResponse(blob::GetResponse &response) { if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { throw std::runtime_error("error reading data from the blob service"); } return nullptr; } void BlobGetClientReactor::doneCallback() { this->dataChunks->write(""); } grpc::Status BlobGetClientReactor::getStatus() const { return this->status; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h index e15fbc3ea..162a38f64 100644 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -1,58 +1,36 @@ #pragma once -#include "ClientReadReactorBase.h" - #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "ClientReadReactorBase.h" + #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobGetClientReactor : public ClientReadReactorBase { std::string holder; std::shared_ptr> dataChunks; public: BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks); std::unique_ptr readResponse(blob::GetResponse &response) override; void doneCallback() override; grpc::Status getStatus() const; }; -BlobGetClientReactor::BlobGetClientReactor( - const std::string &holder, - std::shared_ptr> dataChunks) - : holder(holder), dataChunks(dataChunks) { -} - -std::unique_ptr -BlobGetClientReactor::readResponse(blob::GetResponse &response) { - if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { - throw std::runtime_error("error reading data from the blob service"); - } - return nullptr; -} - -void BlobGetClientReactor::doneCallback() { - this->dataChunks->write(""); -} - -grpc::Status BlobGetClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp similarity index 57% copy from services/backup/src/Reactors/client/blob/BlobPutClientReactor.h copy to services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp index ffe03bac9..372e7d4ba 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -1,105 +1,62 @@ -#pragma once +#include "BlobPutClientReactor.h" -#include "ClientBidiReactorBase.h" -#include "Constants.h" - -#include "../_generated/blob.grpc.pb.h" -#include "../_generated/blob.pb.h" - -#include -#include - -#include #include -#include -#include namespace comm { namespace network { namespace reactor { -class BlobPutClientReactor - : public ClientBidiReactorBase { - - enum class State { - SEND_HOLDER = 0, - SEND_HASH = 1, - SEND_CHUNKS = 2, - }; - - State state = State::SEND_HOLDER; - const std::string hash; - const std::string holder; - size_t currentDataSize = 0; - const size_t chunkSize = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - folly::MPMCQueue dataChunks; - std::condition_variable *terminationNotifier; - -public: - BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier); - void scheduleSendingDataChunk(std::unique_ptr dataChunk); - std::unique_ptr prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) override; - void doneCallback() override; - grpc::Status getStatus() const; -}; - BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier) : holder(holder), hash(hash), dataChunks(folly::MPMCQueue(100)), terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk( std::unique_ptr dataChunk) { if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } } std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); this->state = State::SEND_CHUNKS; return nullptr; } if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; this->dataChunks.blockingRead(dataChunk); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } request.set_datachunk(dataChunk); return nullptr; } void BlobPutClientReactor::doneCallback() { this->terminationNotifier->notify_one(); } grpc::Status BlobPutClientReactor::getStatus() const { return this->status; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h index ffe03bac9..df6a78de8 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,105 +1,54 @@ #pragma once -#include "ClientBidiReactorBase.h" #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "ClientBidiReactorBase.h" + #include #include #include -#include #include #include namespace comm { namespace network { namespace reactor { class BlobPutClientReactor : public ClientBidiReactorBase { enum class State { SEND_HOLDER = 0, SEND_HASH = 1, SEND_CHUNKS = 2, }; State state = State::SEND_HOLDER; const std::string hash; const std::string holder; size_t currentDataSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; std::condition_variable *terminationNotifier; public: BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::unique_ptr dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; void doneCallback() override; grpc::Status getStatus() const; }; -BlobPutClientReactor::BlobPutClientReactor( - const std::string &holder, - const std::string &hash, - std::condition_variable *terminationNotifier) - : holder(holder), - hash(hash), - dataChunks(folly::MPMCQueue(100)), - terminationNotifier(terminationNotifier) { -} - -void BlobPutClientReactor::scheduleSendingDataChunk( - std::unique_ptr dataChunk) { - if (!this->dataChunks.write(std::move(*dataChunk))) { - throw std::runtime_error( - "Error scheduling sending a data chunk to send to the blob service"); - } -} - -std::unique_ptr BlobPutClientReactor::prepareRequest( - blob::PutRequest &request, - std::shared_ptr previousResponse) { - if (this->state == State::SEND_HOLDER) { - this->request.set_holder(this->holder); - this->state = State::SEND_HASH; - return nullptr; - } - if (this->state == State::SEND_HASH) { - request.set_blobhash(this->hash); - this->state = State::SEND_CHUNKS; - return nullptr; - } - if (previousResponse->dataexists()) { - return std::make_unique(grpc::Status::OK); - } - std::string dataChunk; - this->dataChunks.blockingRead(dataChunk); - if (dataChunk.empty()) { - return std::make_unique(grpc::Status::OK); - } - request.set_datachunk(dataChunk); - return nullptr; -} - -void BlobPutClientReactor::doneCallback() { - this->terminationNotifier->notify_one(); -} - -grpc::Status BlobPutClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp similarity index 72% copy from services/backup/src/Reactors/server/CreateNewBackupReactor.h copy to services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index e8e93cc33..a62c04c12 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,137 +1,94 @@ -#pragma once +#include "CreateNewBackupReactor.h" #include "DatabaseManager.h" -#include "ServerBidiReactorBase.h" -#include "ServiceBlobClient.h" #include "Tools.h" -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - -#include -#include -#include -#include - namespace comm { namespace network { namespace reactor { -class CreateNewBackupReactor : public ServerBidiReactorBase< - backup::CreateNewBackupRequest, - backup::CreateNewBackupResponse> { - enum class State { - USER_ID = 1, - KEY_ENTROPY = 2, - DATA_HASH = 3, - DATA_CHUNKS = 4, - }; - - State state = State::USER_ID; - std::string userID; - std::string keyEntropy; - std::string dataHash; - std::string holder; - std::string backupID; - std::shared_ptr putReactor; - - ServiceBlobClient blobClient; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::string generateBackupID(); - -public: - std::unique_ptr handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) override; - void terminateCallback() override; -}; - std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); response->set_backupid(this->backupID); this->holder = this->backupID; this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); if (!this->putReactor->isDone()) { this->blobPutDoneCV.wait(lock2); } else if (!this->putReactor->getStatus().ok()) { throw std::runtime_error(this->putReactor->getStatus().error_message()); } try { // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, getCurrentTimestamp(), generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } catch (std::runtime_error &e) { std::cout << "db operations error: " << e.what() << std::endl; } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h index e8e93cc33..e45a39698 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h @@ -1,137 +1,54 @@ #pragma once -#include "DatabaseManager.h" -#include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" -#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include "ServerBidiReactorBase.h" + #include #include #include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { USER_ID = 1, KEY_ENTROPY = 2, DATA_HASH = 3, DATA_CHUNKS = 4, }; State state = State::USER_ID; std::string userID; std::string keyEntropy; std::string dataHash; std::string holder; std::string backupID; std::shared_ptr putReactor; ServiceBlobClient blobClient; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void terminateCallback() override; }; -std::string CreateNewBackupReactor::generateBackupID() { - // mock - return generateRandomString(); -} - -std::unique_ptr CreateNewBackupReactor::handleRequest( - backup::CreateNewBackupRequest request, - backup::CreateNewBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::KEY_ENTROPY; - return nullptr; - } - case State::KEY_ENTROPY: { - if (!request.has_keyentropy()) { - throw std::runtime_error( - "backup key entropy expected but not received"); - } - this->keyEntropy = request.keyentropy(); - this->state = State::DATA_HASH; - return nullptr; - } - case State::DATA_HASH: { - if (!request.has_newcompactionhash()) { - throw std::runtime_error("data hash expected but not received"); - } - this->dataHash = request.newcompactionhash(); - this->state = State::DATA_CHUNKS; - - // TODO confirm - holder may be a backup id - this->backupID = this->generateBackupID(); - response->set_backupid(this->backupID); - this->holder = this->backupID; - this->putReactor = std::make_shared( - this->holder, this->dataHash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - return nullptr; - } - case State::DATA_CHUNKS: { - this->putReactor->scheduleSendingDataChunk(std::make_unique( - std::move(*request.mutable_newcompactionchunk()))); - return nullptr; - } - } - throw std::runtime_error("new backup - invalid state"); -} - -void CreateNewBackupReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - if (this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lock2(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { - this->blobPutDoneCV.wait(lock2); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); - } - try { - // TODO add recovery data - // TODO handle attachments holders - database::BackupItem backupItem( - this->userID, - this->backupID, - getCurrentTimestamp(), - generateRandomString(), - this->holder, - {}); - database::DatabaseManager::getInstance().putBackupItem(backupItem); - } catch (std::runtime_error &e) { - std::cout << "db operations error: " << e.what() << std::endl; - } -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.cpp similarity index 81% copy from services/backup/src/Reactors/server/PullBackupReactor.h copy to services/backup/src/Reactors/server/PullBackupReactor.cpp index 2b5173e1d..09b3d3a5e 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,188 +1,145 @@ -#pragma once +#include "PullBackupReactor.h" -#include "BlobGetClientReactor.h" -#include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" -#include "ServerWriteReactorBase.h" -#include "ServiceBlobClient.h" - -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - -#include #include -#include -#include -#include namespace comm { namespace network { namespace reactor { -class PullBackupReactor : public ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse> { - - enum class State { - COMPACTION = 1, - LOGS = 2, - }; - - std::shared_ptr backupItem; - std::shared_ptr getReactor; - std::mutex reactorStateMutex; - std::shared_ptr> dataChunks; - ServiceBlobClient blobClient; - State state = State::COMPACTION; - std::vector> logs; - size_t currentLogIndex = 0; - std::shared_ptr currentLog; - - void initializeGetReactor(const std::string &holder); - -public: - PullBackupReactor(const backup::PullBackupRequest *request); - - void initialize() override; - - std::unique_ptr - writeResponse(backup::PullBackupResponse *response) override; - void terminateCallback() override; -}; - PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } this->getReactor.reset( new reactor::BlobGetClientReactor(holder, this->dataChunks)); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { if (this->getReactor == nullptr) { this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!dataChunk.empty()) { response->set_compactionchunk(dataChunk); return nullptr; } if (!this->dataChunks->isEmpty()) { throw std::runtime_error( "dangling data discovered after reading compaction"); } if (!this->getReactor->getStatus().ok()) { throw std::runtime_error(this->getReactor->getStatus().error_message()); } this->state = State::LOGS; } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with the // compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { // we reached the end of the logs collection so we just want to terminate // either we terminate with an error if we have some dangling data // or with success if we don't if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { // we went out of the scope of the logs collection, this should never // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor and // proceed this->initializeGetReactor(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logchunk(this->currentLog->getValue()); ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } } // we want to read the chunks from the blob through the get client until we // get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!this->getReactor->getStatus().ok()) { throw std::runtime_error(this->getReactor->getStatus().error_message()); } // if we get an empty chunk, we reset the currentLog so we can read the next // one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::terminateCallback() { if (!this->getReactor->getStatus().ok()) { throw std::runtime_error(this->getReactor->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.h b/services/backup/src/Reactors/server/PullBackupReactor.h index 2b5173e1d..9e4b0bab6 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/src/Reactors/server/PullBackupReactor.h @@ -1,188 +1,55 @@ #pragma once #include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" -#include "DatabaseManager.h" -#include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" +#include "ServerWriteReactorBase.h" + #include -#include #include #include #include namespace comm { namespace network { namespace reactor { class PullBackupReactor : public ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { enum class State { COMPACTION = 1, LOGS = 2, }; std::shared_ptr backupItem; std::shared_ptr getReactor; std::mutex reactorStateMutex; std::shared_ptr> dataChunks; ServiceBlobClient blobClient; State state = State::COMPACTION; std::vector> logs; size_t currentLogIndex = 0; std::shared_ptr currentLog; void initializeGetReactor(const std::string &holder); public: PullBackupReactor(const backup::PullBackupRequest *request); void initialize() override; std::unique_ptr writeResponse(backup::PullBackupResponse *response) override; void terminateCallback() override; }; -PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) - : ServerWriteReactorBase< - backup::PullBackupRequest, - backup::PullBackupResponse>(request), - dataChunks(std::make_shared>(100)) { -} - -void PullBackupReactor::initializeGetReactor(const std::string &holder) { - if (this->backupItem == nullptr) { - throw std::runtime_error( - "get reactor cannot be initialized when backup item is missing"); - } - this->getReactor.reset( - new reactor::BlobGetClientReactor(holder, this->dataChunks)); - this->getReactor->request.set_holder(holder); - this->blobClient.get(this->getReactor); -} - -void PullBackupReactor::initialize() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->request.userid().empty()) { - throw std::runtime_error("no user id provided"); - } - if (this->request.backupid().empty()) { - throw std::runtime_error("no backup id provided"); - } - this->backupItem = database::DatabaseManager::getInstance().findBackupItem( - this->request.userid(), this->request.backupid()); - if (this->backupItem == nullptr) { - throw std::runtime_error( - "no backup found for provided parameters: user id [" + - this->request.userid() + "], backup id [" + this->request.backupid() + - "]"); - } - this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( - this->request.backupid()); -} - -std::unique_ptr -PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - if (this->state == State::COMPACTION) { - if (this->getReactor == nullptr) { - this->initializeGetReactor(this->backupItem->getCompactionHolder()); - } - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!dataChunk.empty()) { - response->set_compactionchunk(dataChunk); - return nullptr; - } - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error( - "dangling data discovered after reading compaction"); - } - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } - this->state = State::LOGS; - } - if (this->state == State::LOGS) { - // TODO make sure logs are received in correct order regardless their size - if (this->logs.empty()) { - // this means that there are no logs at all so we just terminate with the - // compaction - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex == this->logs.size()) { - // we reached the end of the logs collection so we just want to terminate - // either we terminate with an error if we have some dangling data - // or with success if we don't - if (!this->dataChunks->isEmpty()) { - throw std::runtime_error("dangling data discovered after reading logs"); - } - return std::make_unique(grpc::Status::OK); - } - if (this->currentLogIndex > this->logs.size()) { - // we went out of the scope of the logs collection, this should never - // happen and should be perceived as an error - throw std::runtime_error("log index out of bound"); - } - // this means that we're not reading anything between invocations of - // writeResponse - // it is only not null when we read data in chunks - if (this->currentLog == nullptr) { - this->currentLog = this->logs.at(this->currentLogIndex); - if (this->currentLog->getPersistedInBlob()) { - // if the item is stored in the blob, we initialize the get reactor and - // proceed - this->initializeGetReactor(this->currentLog->getValue()); - } else { - // if the item is persisted in the database, we just take it, send the - // data to the client and reset currentLog so the next invocation of - // writeResponse will take another one from the collection - response->set_logchunk(this->currentLog->getValue()); - ++this->currentLogIndex; - this->currentLog = nullptr; - return nullptr; - } - } - // we want to read the chunks from the blob through the get client until we - // get an empty chunk - a sign of "end of chunks" - std::string dataChunk; - this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } - // if we get an empty chunk, we reset the currentLog so we can read the next - // one from the logs collection. - // If there's data inside, we write it to the client and proceed. - if (dataChunk.empty()) { - ++this->currentLogIndex; - this->currentLog = nullptr; - return nullptr; - } else { - response->set_logchunk(dataChunk); - } - return nullptr; - } - throw std::runtime_error("unhandled state"); -} - -void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); - } -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.cpp similarity index 77% copy from services/backup/src/Reactors/server/SendLogReactor.h copy to services/backup/src/Reactors/server/SendLogReactor.cpp index 15a9b5887..7882222b2 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,213 +1,156 @@ -#pragma once +#include "SendLogReactor.h" #include "Constants.h" -#include "ServerReadReactorBase.h" -#include "ServiceBlobClient.h" +#include "DatabaseManager.h" #include "Tools.h" -#include "../_generated/backup.grpc.pb.h" -#include "../_generated/backup.pb.h" - #include -#include -#include namespace comm { namespace network { namespace reactor { -class SendLogReactor : public ServerReadReactorBase< - backup::SendLogRequest, - google::protobuf::Empty> { - enum class State { - USER_ID = 1, - BACKUP_ID = 2, - LOG_HASH = 3, - LOG_CHUNK = 4, - }; - - enum class PersistenceMethod { - UNKNOWN = 0, - DB = 1, - BLOB = 2, - }; - - State state = State::USER_ID; - PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; - std::string userID; - std::string backupID; - std::string hash; - // either the value itself which is a dump of a single operation (if - // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is - // true) - std::string value; - std::mutex reactorStateMutex; - - std::condition_variable blobPutDoneCV; - std::mutex blobPutDoneCVMutex; - - std::shared_ptr putReactor; - ServiceBlobClient blobClient; - - void storeInDatabase(); - std::string generateHolder(); - std::string generateLogID(); - void initializePutReactor(); - - void storeInBlob(const std::string &data) { - } - -public: - using ServerReadReactorBase:: - ServerReadReactorBase; - - std::unique_ptr - readRequest(backup::SendLogRequest request) override; - void doneCallback() override; - void terminateCallback() override; -}; - void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->generateLogID(), (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateHolder() { // TODO replace mock return generateRandomString(); } std::string SendLogReactor::generateLogID() { // TODO replace mock return generateRandomString(); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = this->generateHolder(); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (!this->putReactor->isDone()) { this->blobPutDoneCV.wait(lockPut); } else if (!this->putReactor->getStatus().ok()) { throw std::runtime_error(this->putReactor->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h index 15a9b5887..eb99a3f42 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,213 +1,66 @@ #pragma once -#include "Constants.h" #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" -#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" -#include #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, google::protobuf::Empty> { enum class State { USER_ID = 1, BACKUP_ID = 2, LOG_HASH = 3, LOG_CHUNK = 4, }; enum class PersistenceMethod { UNKNOWN = 0, DB = 1, BLOB = 2, }; State state = State::USER_ID; PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; std::string backupID; std::string hash; // either the value itself which is a dump of a single operation (if // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is // true) std::string value; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; std::shared_ptr putReactor; ServiceBlobClient blobClient; void storeInDatabase(); std::string generateHolder(); std::string generateLogID(); void initializePutReactor(); - void storeInBlob(const std::string &data) { - } - public: using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; void terminateCallback() override; }; -void SendLogReactor::storeInDatabase() { - // TODO handle attachment holders - database::LogItem logItem( - this->backupID, - this->generateLogID(), - (this->persistenceMethod == PersistenceMethod::BLOB), - this->value, - {}); - database::DatabaseManager::getInstance().putLogItem(logItem); -} - -std::string SendLogReactor::generateHolder() { - // TODO replace mock - return generateRandomString(); -} - -std::string SendLogReactor::generateLogID() { - // TODO replace mock - return generateRandomString(); -} - -void SendLogReactor::initializePutReactor() { - if (this->value.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty value"); - } - if (this->hash.empty()) { - throw std::runtime_error( - "put reactor cannot be initialized with empty hash"); - } - if (this->putReactor == nullptr) { - this->putReactor = std::make_shared( - this->value, this->hash, &this->blobPutDoneCV); - this->blobClient.put(this->putReactor); - } -} - -std::unique_ptr -SendLogReactor::readRequest(backup::SendLogRequest request) { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - switch (this->state) { - case State::USER_ID: { - if (!request.has_userid()) { - throw std::runtime_error("user id expected but not received"); - } - this->userID = request.userid(); - this->state = State::BACKUP_ID; - return nullptr; - }; - case State::BACKUP_ID: { - if (!request.has_backupid()) { - throw std::runtime_error("backup id expected but not received"); - } - this->backupID = request.backupid(); - this->state = State::LOG_HASH; - return nullptr; - }; - case State::LOG_HASH: { - if (!request.has_loghash()) { - throw std::runtime_error("log hash expected but not received"); - } - this->hash = request.loghash(); - this->state = State::LOG_CHUNK; - return nullptr; - }; - case State::LOG_CHUNK: { - if (!request.has_logdata()) { - throw std::runtime_error("log data expected but not received"); - } - std::unique_ptr chunk = - std::make_unique(std::move(*request.mutable_logdata())); - if (chunk->size() == 0) { - return std::make_unique(grpc::Status::OK); - } - // decide if keep in DB or upload to blob - if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::DB; - this->value = std::move(*chunk); - this->storeInDatabase(); - return std::make_unique(grpc::Status::OK); - } else if (this->persistenceMethod == PersistenceMethod::BLOB) { - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } else { - throw std::runtime_error( - "error - invalid persistence state for chunk smaller than " - "database limit"); - } - } else { - if (this->persistenceMethod != PersistenceMethod::UNKNOWN && - this->persistenceMethod != PersistenceMethod::BLOB) { - throw std::runtime_error( - "error - invalid persistence state, uploading to blob should be " - "continued but it is not"); - } - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::BLOB; - } - if (this->value.empty()) { - this->value = this->generateHolder(); - } - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } - - return nullptr; - }; - } - throw std::runtime_error("send log - invalid state"); -} - -void SendLogReactor::terminateCallback() { - const std::lock_guard lock(this->reactorStateMutex); - - if (this->persistenceMethod == PersistenceMethod::DB || - this->putReactor == nullptr) { - return; - } - this->putReactor->scheduleSendingDataChunk(std::make_unique("")); - std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { - this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); - } - // store in db only when we successfully upload chunks - this->storeInDatabase(); -} - -void SendLogReactor::doneCallback() { - // we make sure that the blob client's state is flushed to the main memory - // as there may be multiple threads from the pool taking over here - const std::lock_guard lock(this->reactorStateMutex); - // TODO implement - std::cout << "receive logs done " << this->status.error_code() << "/" - << this->status.error_message() << std::endl; -} - } // namespace reactor } // namespace network } // namespace comm