diff --git a/services/backup/CMakeLists.txt b/services/backup/CMakeLists.txt index 802b734f0..9c128a48e 100644 --- a/services/backup/CMakeLists.txt +++ b/services/backup/CMakeLists.txt @@ -1,110 +1,111 @@ PROJECT(backup C CXX) cmake_minimum_required(VERSION 3.16) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) if(COMMAND cmake_policy) cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) set(CMAKE_CXX_STANDARD 17) set(BUILD_TESTING OFF CACHE BOOL "Turn off tests" FORCE) set(WITH_GTEST "Use Google Test" OFF) # FIND LIBS include(./cmake-components/grpc.cmake) include(./cmake-components/folly.cmake) add_subdirectory(./lib/glog) find_package(AWSSDK REQUIRED COMPONENTS core dynamodb) find_package(Boost 1.40 COMPONENTS program_options REQUIRED) # FIND FILES file(GLOB DOUBLE_CONVERSION_SOURCES "./lib/double-conversion/double-conversion/*.cc") file(GLOB GENERATED_CODE "./_generated/*.cc") file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp") include_directories( ./src + ./src/server-base-reactors ./src/grpc-client ./src/DatabaseEntities ./src/Reactors ./src/Reactors/server ./src/Reactors/server/base-reactors ./src/Reactors/client ./src/Reactors/client/blob ./src/Reactors/client/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion ${Boost_INCLUDE_DIR} ) # SERVER add_executable( backup ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ) set( LIBS ${GRPC_LIBS} ${AWSSDK_LINK_LIBRARIES} ${Boost_LIBRARIES} glog::glog ) target_link_libraries( backup ${LIBS} ) install( TARGETS backup RUNTIME DESTINATION bin/ ) # TEST if ($ENV{COMM_TEST_SERVICES} MATCHES 1) file(GLOB TEST_CODE "./test/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") enable_testing() find_package(GTest REQUIRED) include_directories( ${GTEST_INCLUDE_DIR} ./test ) add_executable( runTests ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ${TEST_CODE} ) target_link_libraries( runTests ${LIBS} gtest gtest_main ) add_test( NAME runTests COMMAND runTests ) endif() diff --git a/services/blob/CMakeLists.txt b/services/blob/CMakeLists.txt index 8b2ed0de0..e85c2196e 100644 --- a/services/blob/CMakeLists.txt +++ b/services/blob/CMakeLists.txt @@ -1,108 +1,109 @@ PROJECT(blob C CXX) cmake_minimum_required(VERSION 3.16) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) if(COMMAND cmake_policy) cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) set(CMAKE_CXX_STANDARD 17) set(BUILD_TESTING OFF CACHE BOOL "Turn off tests" FORCE) set(WITH_GTEST "Use Google Test" OFF) # FIND LIBS include(./cmake-components/grpc.cmake) include(./cmake-components/folly.cmake) add_subdirectory(./lib/glog) find_package(AWSSDK REQUIRED COMPONENTS s3 core dynamodb) find_package(Boost 1.40 COMPONENTS program_options REQUIRED) find_package(OpenSSL REQUIRED) # FIND FILES file(GLOB DOUBLE_CONVERSION_SOURCES "./lib/double-conversion/double-conversion/*.cc") file(GLOB GENERATED_CODE "./_generated/*.cc") file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") include_directories( ./src + ./src/server-base-reactors ./src/DatabaseEntities ./src/Reactors/ ./src/Reactors/server ./src/Reactors/server/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion ${Boost_INCLUDE_DIR} ) # SERVER add_executable( blob ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ) set( LIBS ${GRPC_LIBS} ${AWSSDK_LINK_LIBRARIES} ${Boost_LIBRARIES} OpenSSL::SSL glog::glog ) target_link_libraries( blob ${LIBS} ) install( TARGETS blob RUNTIME DESTINATION bin/ ) # TEST if ($ENV{COMM_TEST_SERVICES} MATCHES 1) file(GLOB TEST_CODE "./test/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") enable_testing() find_package(GTest REQUIRED) include_directories( ${GTEST_INCLUDE_DIR} ./test ) add_executable( runTests ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ${TEST_CODE} ) target_link_libraries( runTests ${LIBS} gtest gtest_main ) add_test( NAME runTests COMMAND runTests ) endif() diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h deleted file mode 100644 index 1b9ae96a3..000000000 --- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ /dev/null @@ -1,157 +0,0 @@ -#pragma once - -#include "BaseReactor.h" - -#include - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -struct ServerBidiReactorStatus { - grpc::Status status; - bool sendLastResponse; - ServerBidiReactorStatus( - grpc::Status status = grpc::Status::OK, - bool sendLastResponse = false) - : status(status), sendLastResponse(sendLastResponse) { - } -}; - -template -class ServerBidiReactorBase : public grpc::ServerBidiReactor, - public BaseReactor { - std::shared_ptr utility; - Request request; - Response response; - -protected: - ServerBidiReactorStatus status; - bool readingAborted = false; - -public: - ServerBidiReactorBase(); - - void terminate(const grpc::Status &status) override; - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - - void OnDone() override; - void OnReadDone(bool ok) override; - void OnWriteDone(bool ok) override; - std::shared_ptr getUtility() override; - - void terminate(ServerBidiReactorStatus status); - ServerBidiReactorStatus getStatus() const; - void setStatus(const ServerBidiReactorStatus &status); - - virtual std::unique_ptr - handleRequest(Request request, Response *response) = 0; -}; - -template -ServerBidiReactorBase::ServerBidiReactorBase() { - this->utility->state = ReactorState::RUNNING; - this->StartRead(&this->request); -} - -template -void ServerBidiReactorBase::terminate( - const grpc::Status &status) { - this->terminate(ServerBidiReactorStatus(status)); -} - -template -void ServerBidiReactorBase::OnDone() { - this->utility->state = ReactorState::DONE; - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; -} - -template -void ServerBidiReactorBase::terminate( - ServerBidiReactorStatus status) { - this->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::runtime_error &e) { - this->setStatus(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); - } - if (this->utility->state != ReactorState::RUNNING) { - return; - } - if (this->getStatus().sendLastResponse) { - this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->getStatus().status); - } else { - this->Finish(this->getStatus().status); - } - this->utility->state = ReactorState::TERMINATED; -} - -template -ServerBidiReactorStatus -ServerBidiReactorBase::getStatus() const { - return this->status; -} - -template -void ServerBidiReactorBase::setStatus( - const ServerBidiReactorStatus &status) { - this->status = status; -} - -template -void ServerBidiReactorBase::OnReadDone(bool ok) { - if (!ok) { - this->readingAborted = true; - // Ending a connection on the other side results in the `ok` flag being set - // to false. It makes it impossible to detect a failure based just on the - // flag. We should manually check if the data we received is valid - this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); - return; - } - try { - this->response = Response(); - std::unique_ptr status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::runtime_error &e) { - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); - } -} - -template -void ServerBidiReactorBase::OnWriteDone(bool ok) { - if (!ok) { - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); - return; - } - this->StartRead(&this->request); -} - -template -std::shared_ptr -ServerBidiReactorBase::getUtility() { - return this->utility; -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h deleted file mode 100644 index 7ece8fb97..000000000 --- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ /dev/null @@ -1,109 +0,0 @@ -#pragma once - -#include "BaseReactor.h" - -#include - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -template -class ServerReadReactorBase : public grpc::ServerReadReactor, - public BaseReactor { - std::shared_ptr utility; - Request request; - -protected: - Response *response; - -public: - ServerReadReactorBase(Response *response); - - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - - void OnReadDone(bool ok) override; - void terminate(const grpc::Status &status) override; - void OnDone() override; - std::shared_ptr getUtility() override; - - virtual std::unique_ptr readRequest(Request request) = 0; -}; - -template -ServerReadReactorBase::ServerReadReactorBase( - Response *response) - : response(response) { - this->utility->state = ReactorState::RUNNING; - this->StartRead(&this->request); -} - -template -void ServerReadReactorBase::OnReadDone(bool ok) { - if (!ok) { - // Ending a connection on the other side results in the `ok` flag being set - // to false. It makes it impossible to detect a failure based just on the - // flag. We should manually check if the data we received is valid - this->terminate(grpc::Status::OK); - return; - } - try { - std::unique_ptr status = this->readRequest(this->request); - if (status != nullptr) { - this->terminate(*status); - return; - } - } catch (std::runtime_error &e) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - return; - } - this->StartRead(&this->request); -} - -template -void ServerReadReactorBase::terminate( - const grpc::Status &status) { - this->utility->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::runtime_error &e) { - this->utility->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() - << std::endl; - } - if (this->utility->state != ReactorState::RUNNING) { - return; - } - this->Finish(this->utility->getStatus()); - this->utility->state = ReactorState::TERMINATED; -} - -template -void ServerReadReactorBase::OnDone() { - this->utility->state = ReactorState::DONE; - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; -} - -template -std::shared_ptr -ServerReadReactorBase::getUtility() { - return this->utility; -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h deleted file mode 100644 index eb7a8c510..000000000 --- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ /dev/null @@ -1,131 +0,0 @@ -#pragma once - -#include "BaseReactor.h" - -#include - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -template -class ServerWriteReactorBase : public grpc::ServerWriteReactor, - public BaseReactor { - std::shared_ptr utility; - Response response; - bool initialized = false; - - void nextWrite(); - -protected: - // this is a const ref since it's not meant to be modified - const Request &request; - -public: - ServerWriteReactorBase(const Request *request); - - void start(); - - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - - virtual void initialize(){}; - void OnWriteDone(bool ok) override; - void terminate(const grpc::Status &status); - void OnDone() override; - std::shared_ptr getUtility() override; - - virtual std::unique_ptr writeResponse(Response *response) = 0; -}; - -template -void ServerWriteReactorBase::terminate( - const grpc::Status &status) { - this->utility->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::runtime_error &e) { - this->utility->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() - << std::endl; - } - if (this->utility->state != ReactorState::RUNNING) { - return; - } - this->Finish(this->utility->getStatus()); - this->utility->state = ReactorState::TERMINATED; -} - -template -ServerWriteReactorBase::ServerWriteReactorBase( - const Request *request) - : request(*request) { - // we cannot call this->start() here because it's going to call it on - // the base class, not derived leading to the runtime error of calling - // a pure virtual function - // start has to be exposed as a public function and called explicitly - // to initialize writing -} - -template -void ServerWriteReactorBase::nextWrite() { - try { - if (!this->initialized) { - this->initialize(); - this->initialized = true; - } - this->response = Response(); - std::unique_ptr status = this->writeResponse(&this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } -} - -template -void ServerWriteReactorBase::start() { - this->utility->state = ReactorState::RUNNING; - this->nextWrite(); -} - -template -void ServerWriteReactorBase::OnDone() { - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; -} - -template -std::shared_ptr -ServerWriteReactorBase::getUtility() { - return this->utility; -} - -template -void ServerWriteReactorBase::OnWriteDone(bool ok) { - if (!ok) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); - return; - } - this->nextWrite(); -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h similarity index 100% rename from services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h rename to services/lib/src/server-base-reactors/ServerBidiReactorBase.h diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h similarity index 100% rename from services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h rename to services/lib/src/server-base-reactors/ServerReadReactorBase.h diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h similarity index 100% rename from services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h rename to services/lib/src/server-base-reactors/ServerWriteReactorBase.h