diff --git a/services/backup/docker-server/contents/server/CMakeLists.txt b/services/backup/docker-server/contents/server/CMakeLists.txt index 65f9008f0..3d304c3c6 100644 --- a/services/backup/docker-server/contents/server/CMakeLists.txt +++ b/services/backup/docker-server/contents/server/CMakeLists.txt @@ -1,125 +1,130 @@ PROJECT(backup C CXX) cmake_minimum_required(VERSION 3.16) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) if(COMMAND cmake_policy) cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) set(CMAKE_CXX_STANDARD 17) set(BUILD_TESTING OFF CACHE BOOL "Turn off tests" FORCE) set(WITH_GTEST "Use Google Test" OFF) # FIND LIBS include(./cmake-components/grpc.cmake) include(./cmake-components/folly.cmake) add_subdirectory(./lib/glog) find_package(AWSSDK REQUIRED COMPONENTS core dynamodb) find_package(Boost 1.40 COMPONENTS program_options REQUIRED) # FIND FILES file(GLOB DOUBLE_CONVERSION_SOURCES "./lib/double-conversion/double-conversion/*.cc") if ($ENV{COMM_TEST_SERVICES} MATCHES 1) add_compile_definitions(COMM_TEST_SERVICES) endif() file(GLOB GENERATED_CODE "./_generated/*.cc") set(DEV_SOURCE_CODE "") set(DEV_HEADERS_PATH "") if ($ENV{COMM_SERVICES_DEV_MODE} MATCHES 1) add_compile_definitions(COMM_SERVICES_DEV_MODE) file(GLOB DEV_SOURCE_CODE "./dev/*.cpp" "./src/*.dev.cpp") set(DEV_HEADERS_PATH "./dev") endif() file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$") foreach (ITEM ${DEV_SOURCE_CODE}) string(REPLACE "/" ";" SPLIT_ITEM ${ITEM}) list(GET SPLIT_ITEM -1 FILE_FULL_NAME) string(REPLACE ".dev.cpp" ".cpp" FILE_NAME ${FILE_FULL_NAME}) list(FILTER SOURCE_CODE EXCLUDE REGEX ".*${FILE_NAME}$") list(APPEND SOURCE_CODE "${ITEM}") endforeach() include_directories( ./src ./src/DatabaseEntities ./src/Reactors + ./src/Reactors/server + ./src/Reactors/server/base-reactors + ./src/Reactors/client + ./src/Reactors/client/blob + ./src/Reactors/client/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion ${Boost_INCLUDE_DIR} ${DEV_HEADERS_PATH} ) # SERVER add_executable( backup ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ) set( LIBS ${GRPC_LIBS} ${AWSSDK_LINK_LIBRARIES} ${Boost_LIBRARIES} glog::glog ) target_link_libraries( backup ${LIBS} ) install( TARGETS backup RUNTIME DESTINATION bin/ ) # TEST if ($ENV{COMM_TEST_SERVICES} MATCHES 1) file(GLOB TEST_CODE "./test/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") enable_testing() find_package(GTest REQUIRED) include_directories( ${GTEST_INCLUDE_DIR} ./test ) add_executable( runTests ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ${TEST_CODE} ) target_link_libraries( runTests ${LIBS} gtest gtest_main ) add_test( NAME runTests COMMAND runTests ) endif() diff --git a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp index 5b30da4fd..897622274 100644 --- a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp +++ b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp @@ -1,98 +1,98 @@ #include "BackupServiceImpl.h" -#include "BidiReactorBase.h" #include "ReadReactorBase.h" +#include "ServerBidiReactorBase.h" #include namespace comm { namespace network { BackupServiceImpl::BackupServiceImpl() { Aws::InitAPI({}); } BackupServiceImpl::~BackupServiceImpl() { Aws::ShutdownAPI({}); } grpc::ServerBidiReactor< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { - class CreateNewBackupReactor : public BidiReactorBase< + class CreateNewBackupReactor : public reactor::ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { public: - std::unique_ptr handleRequest( + std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override { // TODO handle request - return std::make_unique( - grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); + return std::make_unique( + grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } }; return new CreateNewBackupReactor(); } grpc::ServerReadReactor *BackupServiceImpl::SendLog( grpc::CallbackServerContext *context, google::protobuf::Empty *response) { class SendLogReactor : public ReadReactorBase< backup::SendLogRequest, google::protobuf::Empty> { public: using ReadReactorBase:: ReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override { // TODO handle request return std::make_unique( grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); } }; return new SendLogReactor(response); } grpc::ServerBidiReactor< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { - class RecoverBackupKeyReactor : public BidiReactorBase< + class RecoverBackupKeyReactor : public reactor::ServerBidiReactorBase< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> { public: - std::unique_ptr handleRequest( + std::unique_ptr handleRequest( backup::RecoverBackupKeyRequest request, backup::RecoverBackupKeyResponse *response) override { // TODO handle request - return std::make_unique( - grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); + return std::make_unique( + grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } }; return new RecoverBackupKeyReactor(); } grpc::ServerBidiReactor * BackupServiceImpl::PullBackup(grpc::CallbackServerContext *context) { - class PullBackupReactor : public BidiReactorBase< + class PullBackupReactor : public reactor::ServerBidiReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { public: - std::unique_ptr handleRequest( + std::unique_ptr handleRequest( backup::PullBackupRequest request, backup::PullBackupResponse *response) override { // TODO handle request - return std::make_unique( - grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); + return std::make_unique( + grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } }; return new PullBackupReactor(); } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h deleted file mode 100644 index 082d2d749..000000000 --- a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace comm { -namespace network { - -template -class BidiReactorBase : public grpc::ServerBidiReactor { - Request request; - Response response; - -public: - BidiReactorBase(); - - void OnDone() override; - void OnReadDone(bool ok) override; - void OnWriteDone(bool ok) override; - - virtual std::unique_ptr - handleRequest(Request request, Response *response) = 0; -}; - -template -BidiReactorBase::BidiReactorBase() { - this->StartRead(&this->request); -} - -template -void BidiReactorBase::OnDone() { - delete this; -} - -template -void BidiReactorBase::OnReadDone(bool ok) { - if (!ok) { - this->Finish( - grpc::Status(grpc::StatusCode::INTERNAL, "OnReadDone: reading error")); - return; - } - this->response = Response(); - try { - std::unique_ptr status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->Finish(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::runtime_error &e) { - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } -} - -template -void BidiReactorBase::OnWriteDone(bool ok) { - if (!ok) { - std::cout << "Server write failed" << std::endl; - return; - } - this->StartRead(&this->request); -} - -} // namespace network -} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h new file mode 100644 index 000000000..5cc9a0809 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -0,0 +1,107 @@ +#pragma once + +#include + +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +struct ServerBidiReactorStatus { + grpc::Status status; + bool sendLastResponse; + ServerBidiReactorStatus( + grpc::Status status = grpc::Status::OK, + bool sendLastResponse = false) + : status(status), sendLastResponse(sendLastResponse) { + } +}; + +template +class ServerBidiReactorBase + : public grpc::ServerBidiReactor { + Request request; + Response response; + +protected: + ServerBidiReactorStatus status; + bool readingAborted = false; + +public: + ServerBidiReactorBase(); + + void OnDone() override; + void OnReadDone(bool ok) override; + void OnWriteDone(bool ok) override; + + void terminate(ServerBidiReactorStatus status); + + virtual std::unique_ptr + handleRequest(Request request, Response *response) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; +}; + +template +ServerBidiReactorBase::ServerBidiReactorBase() { + this->initialize(); + this->StartRead(&this->request); +} + +template +void ServerBidiReactorBase::OnDone() { + this->doneCallback(); + delete this; +} + +template +void ServerBidiReactorBase::terminate( + ServerBidiReactorStatus status) { + this->status = status; + if (this->status.sendLastResponse) { + this->StartWriteAndFinish( + &this->response, grpc::WriteOptions(), this->status.status); + } else { + this->Finish(this->status.status); + } +} + +template +void ServerBidiReactorBase::OnReadDone(bool ok) { + if (!ok) { + this->readingAborted = true; + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::ABORTED, "no more reads"))); + return; + } + try { + this->response = Response(); + std::unique_ptr status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); + } +} + +template +void ServerBidiReactorBase::OnWriteDone(bool ok) { + if (!ok) { + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); + return; + } + this->StartRead(&this->request); +} + +} // namespace reactor +} // namespace network +} // namespace comm