diff --git a/services/lib/cmake-components/folly.cmake b/services/lib/cmake-components/folly.cmake deleted file mode 100644 --- a/services/lib/cmake-components/folly.cmake +++ /dev/null @@ -1,54 +0,0 @@ -add_definitions( - -DFOLLY_NO_CONFIG=1 - -DFOLLY_HAVE_CLOCK_GETTIME=1 - -DFOLLY_HAVE_MEMRCHR=1 - -DFOLLY_USE_LIBCPP=0 - -DFOLLY_MOBILE=0 -) - -set( - FOLLY_SOURCES - - ./lib/folly/folly/detail/Futex.cpp - ./lib/folly/folly/synchronization/ParkingLot.cpp - ./lib/folly/folly/lang/SafeAssert.cpp - ./lib/folly/folly/FileUtil.cpp - ./lib/folly/folly/Subprocess.cpp - ./lib/folly/folly/File.cpp - ./lib/folly/folly/Format.cpp - ./lib/folly/folly/Conv.cpp - ./lib/folly/folly/io/IOBuf.cpp - ./lib/folly/folly/memory/detail/MallocImpl.cpp - ./lib/folly/folly/ScopeGuard.cpp - ./lib/folly/folly/hash/SpookyHashV2.cpp - ./lib/folly/folly/io/IOBufQueue.cpp - ./lib/folly/folly/lang/Assume.cpp - ./lib/folly/folly/String.cpp - ./lib/folly/folly/portability/SysUio.cpp - ./lib/folly/folly/net/NetOps.cpp - ./lib/folly/folly/synchronization/Hazptr.cpp - ./lib/folly/folly/detail/ThreadLocalDetail.cpp - ./lib/folly/folly/SharedMutex.cpp - ./lib/folly/folly/concurrency/CacheLocality.cpp - ./lib/folly/folly/detail/StaticSingletonManager.cpp - ./lib/folly/folly/executors/ThreadPoolExecutor.cpp - ./lib/folly/folly/executors/GlobalThreadPoolList.cpp - ./lib/folly/folly/Demangle.cpp - ./lib/folly/folly/synchronization/AsymmetricMemoryBarrier.cpp - ./lib/folly/folly/io/async/Request.cpp - ./lib/folly/folly/detail/MemoryIdler.cpp - ./lib/folly/folly/detail/AtFork.cpp - ./lib/folly/folly/Executor.cpp - ./lib/folly/folly/lang/CString.cpp - ./lib/folly/folly/portability/SysMembarrier.cpp - ./lib/folly/folly/container/detail/F14Table.cpp - ./lib/folly/folly/detail/UniqueInstance.cpp - ./lib/folly/folly/executors/QueuedImmediateExecutor.cpp - ./lib/folly/folly/memory/MallctlHelper.cpp -) - -set( - FOLLY_INCLUDES - - ./lib/folly -) diff --git a/services/lib/cmake-components/grpc.cmake b/services/lib/cmake-components/grpc.cmake deleted file mode 100644 --- a/services/lib/cmake-components/grpc.cmake +++ /dev/null @@ -1,25 +0,0 @@ -# Disable naming conventions, as the variable names are determined by upstream -# cmake-lint: disable=C0103 - -# protobuf -set(protobuf_MODULE_COMPATIBLE TRUE) -find_package(Protobuf CONFIG REQUIRED) -message(STATUS "Using protobuf ${Protobuf_VERSION}") - -set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf) -set(_PROTOBUF_PROTOC $) - -# Find gRPC installation -find_package(gRPC CONFIG REQUIRED) -message(STATUS "Using gRPC ${gRPC_VERSION}") - -set(_GRPC_GRPCPP gRPC::grpc++) -set(_GRPC_CPP_PLUGIN_EXECUTABLE $) - -set( - GRPC_LIBS - - ${_GRPC_GRPCPP} - ${_PROTOBUF_LIBPROTOBUF} - gRPC::grpc++_reflection -) diff --git a/services/lib/docker/build_service.sh b/services/lib/docker/build_service.sh deleted file mode 100755 --- a/services/lib/docker/build_service.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash - -set -e - -# Allow scripts to be called from anywhere -SCRIPT_DIR=$(cd "$(dirname "$0")"; pwd -P) - -# folly hack - https://github.com/facebook/folly/pull/1231 -sed -i 's/#if __has_include()/#if __has_include()/g' \ - /usr/local/include/folly/detail/Demangle.h - -rm -rf cmake/build -mkdir -p cmake/build - -"${SCRIPT_DIR}"/build_sources.sh diff --git a/services/lib/docker/build_sources.sh b/services/lib/docker/build_sources.sh deleted file mode 100755 --- a/services/lib/docker/build_sources.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env bash - -set -e - -NPROC=0 - -NPROC=$(nproc 2> /dev/null || echo 1) -if [[ $NPROC -eq 1 ]]; then - NPROC=$(sysctl -n hw.physicalcpu 2> /dev/null || echo 1) -fi - -echo "building the server (nproc=$NPROC)..." - -pushd cmake/build -# gtest is not installed, avoid building test suites -cmake ../.. -DBUILD_TESTING=OFF -make -j "$NPROC" - -popd - -echo "success - server built" diff --git a/services/lib/docker/install_corrosion.sh b/services/lib/docker/install_corrosion.sh deleted file mode 100755 --- a/services/lib/docker/install_corrosion.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash -set -e - -cd /tmp - -git clone --recurse-submodules -b v0.2.1 --single-branch https://github.com/corrosion-rs/corrosion.git -pushd corrosion -cmake -S. -Bbuild -DCMAKE_BUILD_TYPE=Release -cmake --build build --config Release -cmake --install build --config Release - -popd # corrosion -rm -rf corrosion diff --git a/services/lib/docker/proto_codegen.sh b/services/lib/docker/proto_codegen.sh deleted file mode 100755 --- a/services/lib/docker/proto_codegen.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash - -set -e - -echo "generating files from protos..." - -for PROTO_FILE in ./protos/*; do - protoc -I=./protos --cpp_out=_generated --grpc_out=_generated --plugin=protoc-gen-grpc="$(which grpc_cpp_plugin)" ./"$PROTO_FILE" -done - -echo "success - code generated from protos" diff --git a/services/lib/docker/run_service.sh b/services/lib/docker/run_service.sh deleted file mode 100755 --- a/services/lib/docker/run_service.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash - -set -e - -EXE_PATH="./cmake/build/bin" - -EXE=$(find $EXE_PATH -mindepth 1 -maxdepth 1 -not -path '*/.*') -EXES=$(wc -l <<< "$EXE") - -if [[ $EXES -ne 1 ]]; then - echo "there should be exactly one executable of a service, $EXES found"; - exit 1; -fi - -"$EXE" diff --git a/services/lib/docker/run_tests.sh b/services/lib/docker/run_tests.sh deleted file mode 100755 --- a/services/lib/docker/run_tests.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -set -e - -pushd cmake/build -make test ARGS="-V" -popd # cmake/build diff --git a/services/lib/src/BaseReactor.h b/services/lib/src/BaseReactor.h deleted file mode 100644 --- a/services/lib/src/BaseReactor.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include "ReactorStatusHolder.h" - -#include - -#include - -namespace comm { -namespace network { -namespace reactor { - -class BaseReactor { -public: - // Returns a status holder that consists of: - // - reactor's state, - // - status of the operation that's being performed by this reactor. - virtual std::shared_ptr getStatusHolder() = 0; - // Should be called when we plan to terminate the connection for some reason - // either with a success or a failure status. - // Receives a status indicating a result of the reactor's operation. - virtual void terminate(const grpc::Status &status) = 0; - // Validates current values of the reactor's fields. - virtual void validate() = 0; - // Should be called when `OnDone` is called. gRPC calls `OnDone` when there - // are not going to be more rpc operations. - virtual void doneCallback() = 0; - // Should be called when `terminate` is called. - virtual void terminateCallback() = 0; -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/CMakeLists.txt b/services/lib/src/CMakeLists.txt deleted file mode 100644 --- a/services/lib/src/CMakeLists.txt +++ /dev/null @@ -1,62 +0,0 @@ -project(comm-services-common) -cmake_minimum_required(VERSION 3.4) -set(CMAKE_CXX_STANDARD 14) - -include(GNUInstallDirs) - -# Export reactors -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/client-base-reactors) -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/server-base-reactors) - -file(GLOB COMMON_HDRS - ${CMAKE_CURRENT_SOURCE_DIR}/*.h -) - -file(GLOB COMMON_SRCS - ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp -) - -add_library(comm-services-common - ${COMMON_HDRS} - ${COMMON_SRCS} -) - -find_package(AWSSDK REQUIRED COMPONENTS core dynamodb) -find_package(Boost 1.40 COMPONENTS program_options REQUIRED) -find_package(Protobuf REQUIRED) -find_package(glog REQUIRED) -find_package(gRPC REQUIRED) - -target_link_libraries(comm-services-common - glog::glog - gRPC::grpc++ - ${AWSSDK_LINK_LIBRARIES} - ${Boost_LIBRARIES} -) - -target_include_directories(comm-services-common - PUBLIC - $ - $ -) - -install(TARGETS comm-services-common EXPORT comm-services-common-export - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT comm-services-common - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT comm-services-common - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT comm-services-common -) - -install(FILES ${COMMON_HDRS} DESTINATION include) - -set(_pname ${PROJECT_NAME}) -export(TARGETS comm-services-common - NAMESPACE comm-services-common:: - FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake -) - -# For installation -install(EXPORT comm-services-common-export - FILE comm-services-common-targets.cmake - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-services-common - NAMESPACE comm-services-common:: -) diff --git a/services/lib/src/DatabaseEntitiesTools.h b/services/lib/src/DatabaseEntitiesTools.h deleted file mode 100644 --- a/services/lib/src/DatabaseEntitiesTools.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#include "Item.h" - -#include -#include - -namespace comm { -namespace network { -namespace database { - -template std::shared_ptr createItemByType() { - static_assert(std::is_base_of::value, "T must inherit from Item"); - return std::make_shared(); -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h deleted file mode 100644 --- a/services/lib/src/DatabaseManagerBase.h +++ /dev/null @@ -1,56 +0,0 @@ -#pragma once - -#include "DatabaseEntitiesTools.h" -#include "DynamoDBTools.h" - -#include -#include -#include - -#include - -namespace comm { -namespace network { -namespace database { - -// this class should be thread-safe in case any shared resources appear -class DatabaseManagerBase { -protected: - void innerPutItem( - std::shared_ptr item, - const Aws::DynamoDB::Model::PutItemRequest &request); - - template - std::shared_ptr - innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); - - void innerRemoveItem(const Item &item); - void innerBatchWriteItem( - const std::string &tableName, - const size_t &chunkSize, - const size_t &backoffFirstRetryDelay, - const size_t &maxBackoffTime, - std::vector &writeRequests); -}; - -template -std::shared_ptr DatabaseManagerBase::innerFindItem( - Aws::DynamoDB::Model::GetItemRequest &request) { - std::shared_ptr item = createItemByType(); - request.SetTableName(item->getTableName()); - const Aws::DynamoDB::Model::GetItemOutcome &outcome = - getDynamoDBClient()->GetItem(request); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } - const AttributeValues &outcomeItem = outcome.GetResult().GetItem(); - if (!outcomeItem.size()) { - return nullptr; - } - item->assignItemFromDatabase(outcomeItem); - return item; -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp deleted file mode 100644 --- a/services/lib/src/DatabaseManagerBase.cpp +++ /dev/null @@ -1,104 +0,0 @@ -#include "DatabaseManagerBase.h" - -#include "Item.h" - -#include -#include -#include -#include -#include - -#include -#include -#include - -namespace comm { -namespace network { -namespace database { - -void DatabaseManagerBase::innerPutItem( - std::shared_ptr item, - const Aws::DynamoDB::Model::PutItemRequest &request) { - const Aws::DynamoDB::Model::PutItemOutcome outcome = - getDynamoDBClient()->PutItem(request); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } -} - -void DatabaseManagerBase::innerRemoveItem(const Item &item) { - Aws::DynamoDB::Model::DeleteItemRequest request; - request.SetTableName(item.getTableName()); - PrimaryKeyDescriptor pk = item.getPrimaryKeyDescriptor(); - PrimaryKeyValue primaryKeyValue = item.getPrimaryKeyValue(); - request.AddKey( - pk.partitionKey, - Aws::DynamoDB::Model::AttributeValue(primaryKeyValue.partitionKey)); - if (pk.sortKey != nullptr && primaryKeyValue.sortKey != nullptr) { - request.AddKey( - *pk.sortKey, - Aws::DynamoDB::Model::AttributeValue(*primaryKeyValue.sortKey)); - } - - const Aws::DynamoDB::Model::DeleteItemOutcome &outcome = - getDynamoDBClient()->DeleteItem(request); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } -} - -void DatabaseManagerBase::innerBatchWriteItem( - const std::string &tableName, - const size_t &chunkSize, - const size_t &backoffFirstRetryDelay, - const size_t &maxBackoffTime, - std::vector &writeRequests) { - // Split write requests to chunks by chunkSize size and write - // them by batch - Aws::DynamoDB::Model::BatchWriteItemOutcome outcome; - std::vector writeRequestsChunk; - std::vector::iterator chunkPositionStart, - chunkPositionEnd; - for (size_t i = 0; i < writeRequests.size(); i += chunkSize) { - chunkPositionStart = writeRequests.begin() + i; - chunkPositionEnd = - writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize); - writeRequestsChunk = std::vector( - chunkPositionStart, chunkPositionEnd); - - Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest; - writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk); - outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } - - size_t delayRetry, delayMs, jitterMs; - while (!outcome.GetResult().GetUnprocessedItems().empty()) { - if (delayMs == maxBackoffTime) { - throw std::runtime_error( - "InnerBatchWriteItem error: maximum wait time to put unprocessed " - "items to DynamoDB is exceeded."); - } - jitterMs = std::rand() % 99 + 1; - delayRetry++; - delayMs = std::min( - size_t(backoffFirstRetryDelay * std::pow(2, delayRetry) + jitterMs), - maxBackoffTime); - LOG(INFO) << "Waiting for a backoff " << delayMs - << "ms delay before putting unprocessed items from batch write " - "to DynamoDB"; - std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); - writeBatchRequest.SetRequestItems( - outcome.GetResult().GetUnprocessedItems()); - outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); - if (!outcome.IsSuccess()) { - throw std::runtime_error(outcome.GetError().GetMessage()); - } - } - } -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/lib/src/DynamoDBTools.h b/services/lib/src/DynamoDBTools.h deleted file mode 100644 --- a/services/lib/src/DynamoDBTools.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include -#include - -#include - -namespace comm { -namespace network { - -std::unique_ptr getDynamoDBClient(); - -} // namespace network -} // namespace comm diff --git a/services/lib/src/DynamoDBTools.cpp b/services/lib/src/DynamoDBTools.cpp deleted file mode 100644 --- a/services/lib/src/DynamoDBTools.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "DynamoDBTools.h" -#include "GlobalConstants.h" -#include "GlobalTools.h" - -namespace comm { -namespace network { - -std::unique_ptr getDynamoDBClient() { - Aws::Client::ClientConfiguration config; - config.region = AWS_REGION; - if (tools::isSandbox()) { - config.endpointOverride = Aws::String("localstack:4566"); - config.scheme = Aws::Http::Scheme::HTTP; - } - return std::make_unique(config); -} - -} // namespace network -} // namespace comm diff --git a/services/lib/src/GlobalConstants.h b/services/lib/src/GlobalConstants.h deleted file mode 100644 --- a/services/lib/src/GlobalConstants.h +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include - -namespace comm { -namespace network { - -// 4MB limit -// WARNING: use keeping in mind that grpc adds its own headers to messages -// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md -// so the message that actually is being sent over the network looks like this -// [Compressed-Flag] [Message-Length] [Message] -// [Compressed-Flag] 1 byte - added by grpc -// [Message-Length] 4 bytes - added by grpc -// [Message] N bytes - actual data -// so for every message we get 5 additional bytes of data -// as mentioned here -// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 -// grpc stream may contain more than one message -const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024; -const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5; - -const std::string AWS_REGION = "us-east-2"; - -const char ATTACHMENT_DELIMITER = ';'; - -// gRPC Server -const std::string SERVER_LISTEN_ADDRESS = "0.0.0.0:50051"; - -} // namespace network -} // namespace comm diff --git a/services/lib/src/GlobalTools.h b/services/lib/src/GlobalTools.h deleted file mode 100644 --- a/services/lib/src/GlobalTools.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include -#include - -namespace comm { -namespace network { -namespace tools { - -const std::string ID_SEPARATOR = ":"; - -uint64_t getCurrentTimestamp(); - -bool hasEnvFlag(const std::string &flag); - -size_t getNumberOfCores(); - -std::string decorateTableName(const std::string &baseName); - -bool isSandbox(); - -std::string generateUUID(); - -bool validateUUIDv4(const std::string &uuid); - -void InitLogging(const std::string &programName); - -} // namespace tools -} // namespace network -} // namespace comm diff --git a/services/lib/src/GlobalTools.cpp b/services/lib/src/GlobalTools.cpp deleted file mode 100644 --- a/services/lib/src/GlobalTools.cpp +++ /dev/null @@ -1,80 +0,0 @@ -#include "GlobalTools.h" - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace tools { - -uint64_t getCurrentTimestamp() { - using namespace std::chrono; - return duration_cast(system_clock::now().time_since_epoch()) - .count(); -} - -bool hasEnvFlag(const std::string &flag) { - if (std::getenv(flag.c_str()) == nullptr) { - return false; - } - return std::string(std::getenv(flag.c_str())) == "1"; -} - -size_t getNumberOfCores() { - return (size_t)std::max(1u, std::thread::hardware_concurrency()); -} - -std::string decorateTableName(const std::string &baseName) { - std::string suffix = ""; - if (hasEnvFlag("COMM_TEST_SERVICES")) { - suffix = "-test"; - } - return baseName + suffix; -} - -bool isSandbox() { - return hasEnvFlag("COMM_SERVICES_SANDBOX"); -} - -std::string generateUUID() { - thread_local boost::uuids::random_generator random_generator; - return boost::uuids::to_string(random_generator()); -} - -bool validateUUIDv4(const std::string &uuid) { - const std::regex uuidV4RegexFormat( - "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"); - try { - return std::regex_match(uuid, uuidV4RegexFormat); - } catch (const std::exception &e) { - LOG(ERROR) << "Tools: " - << "Got an exception at `validateUUID`: " << e.what(); - return false; - } -} - -void InitLogging(const std::string &programName) { - FLAGS_logtostderr = true; - FLAGS_colorlogtostderr = true; - if (comm::network::tools::isSandbox()) { - // Log levels INFO, WARNING, ERROR, FATAL are 0, 1, 2, 3, respectively - FLAGS_minloglevel = 0; - } else { - FLAGS_minloglevel = 1; - } - google::InitGoogleLogging(programName.c_str()); -} - -} // namespace tools -} // namespace network -} // namespace comm diff --git a/services/lib/src/Item.h b/services/lib/src/Item.h deleted file mode 100644 --- a/services/lib/src/Item.h +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include -#include - -#include -#include - -namespace comm { -namespace network { -namespace database { - -typedef Aws::Map - AttributeValues; - -struct PrimaryKeyBase { - PrimaryKeyBase(const std::string partitionKey) - : partitionKey(partitionKey), sortKey(nullptr) { - } - PrimaryKeyBase(const std::string partitionKey, const std::string sortKey) - : partitionKey(partitionKey), - sortKey(std::make_unique(sortKey)) { - } - - const std::string partitionKey; - std::unique_ptr sortKey; -}; - -struct PrimaryKeyDescriptor : PrimaryKeyBase { - using PrimaryKeyBase::PrimaryKeyBase; -}; - -struct PrimaryKeyValue : PrimaryKeyBase { - using PrimaryKeyBase::PrimaryKeyBase; -}; - -class Item { - virtual void validate() const = 0; - -public: - virtual std::string getTableName() const = 0; - virtual PrimaryKeyDescriptor getPrimaryKeyDescriptor() const = 0; - virtual PrimaryKeyValue getPrimaryKeyValue() const = 0; - virtual void assignItemFromDatabase(const AttributeValues &itemFromDB) = 0; -}; - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/lib/src/ReactorStatusHolder.h b/services/lib/src/ReactorStatusHolder.h deleted file mode 100644 --- a/services/lib/src/ReactorStatusHolder.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -enum class ReactorState { - NONE = 0, - RUNNING = 1, - TERMINATED = 2, - DONE = 3, -}; - -class ReactorStatusHolder { -private: - grpc::Status status = grpc::Status::OK; - std::mutex statusAccessMutex; - -public: - std::atomic state = {ReactorState::NONE}; - - grpc::Status getStatus() { - const std::unique_lock lock(this->statusAccessMutex); - return this->status; - } - void setStatus(const grpc::Status &status) { - const std::unique_lock lock(this->statusAccessMutex); - this->status = status; - } -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/ThreadPool.h b/services/lib/src/ThreadPool.h deleted file mode 100644 --- a/services/lib/src/ThreadPool.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include "GlobalTools.h" - -#include -#include - -#include -#include - -typedef std::function Task; -typedef std::function)> Callback; - -namespace comm { -namespace network { - -class ThreadPool { - boost::asio::thread_pool pool = - boost::asio::thread_pool(tools::getNumberOfCores()); - - ThreadPool() { - } - - virtual ~ThreadPool() { - } - -public: - static ThreadPool &getInstance() { - static ThreadPool instance; - return instance; - } - - void scheduleWithCallback(Task task, Callback callback) { - boost::asio::post(this->pool, [task, callback]() { - std::unique_ptr err = nullptr; - try { - task(); - } catch (std::exception &e) { - err = std::make_unique(e.what()); - } - callback(std::move(err)); - }); - } -}; - -} // namespace network -} // namespace comm diff --git a/services/lib/src/client-base-reactors/CMakeLists.txt b/services/lib/src/client-base-reactors/CMakeLists.txt deleted file mode 100644 --- a/services/lib/src/client-base-reactors/CMakeLists.txt +++ /dev/null @@ -1,44 +0,0 @@ -project(comm-client-base-reactors CXX C) -cmake_minimum_required(VERSION 3.10) - -include(GNUInstallDirs) - -set(CLIENT_HDRS - ClientWriteReactorBase.h - ClientBidiReactorBase.h - ClientReadReactorBase.h -) - -add_library(comm-client-base-reactors - INTERFACE -) - -target_include_directories(comm-client-base-reactors - INTERFACE - $ - $ -) - -install(TARGETS comm-client-base-reactors - EXPORT comm-client-base-reactors-export - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - COMPONENT comm-client-base-reactors - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - COMPONENT comm-client-base-reactors - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} - COMPONENT comm-client-base-reactors -) - -install(FILES ${COMMON_HDRS} DESTINATION include) - -set(_pname ${PROJECT_NAME}) -export(TARGETS comm-client-base-reactors - NAMESPACE comm-client-base-reactors:: - FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake -) - -install(EXPORT comm-client-base-reactors-export - FILE comm-client-base-reactors-targets.cmake - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-client-base-reactors - NAMESPACE comm-client-base-reactors:: -) diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h deleted file mode 100644 --- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h +++ /dev/null @@ -1,144 +0,0 @@ -#pragma once - -#include "BaseReactor.h" - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -// This is how this type of reactor works: -// - repeat: -// - write a request to the server -// - read a response from the server -// - terminate the connection -template -class ClientBidiReactorBase : public grpc::ClientBidiReactor, - public BaseReactor { - std::shared_ptr statusHolder = - std::make_shared(); - std::shared_ptr response = nullptr; - void nextWrite(); - -protected: - Request request; - -public: - grpc::ClientContext context; - - // this should be called explicitly right after the reactor is created - void start(); - - // these methods come from the BaseReactor(go there for more information) - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - std::shared_ptr getStatusHolder() override; - - // these methods come from gRPC - // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 - void OnWriteDone(bool ok) override; - void OnReadDone(bool ok) override; - void terminate(const grpc::Status &status) override; - void OnDone(const grpc::Status &status) override; - - // - argument request - request that's about to be prepared for the next cycle - // - argument previousResponse - response received during the previous cycle - // (may be nullptr) - // - returns status - if the connection is about to be - // continued, nullptr should be returned. Any other returned value will - // terminate the connection with a given status - virtual std::unique_ptr prepareRequest( - Request &request, - std::shared_ptr previousResponse) = 0; -}; - -template -void ClientBidiReactorBase::nextWrite() { - this->request = Request(); - try { - std::unique_ptr status = - this->prepareRequest(this->request, this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - } catch (std::runtime_error &e) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - return; - } - this->StartWrite(&this->request); -} - -template -void ClientBidiReactorBase::start() { - if (this->statusHolder->state != ReactorState::NONE) { - return; - } - this->statusHolder->state = ReactorState::RUNNING; - this->nextWrite(); - this->StartCall(); -} - -template -void ClientBidiReactorBase::OnWriteDone(bool ok) { - if (this->response == nullptr) { - this->response = std::make_shared(); - } - this->StartRead(&(*this->response)); -} - -template -void ClientBidiReactorBase::OnReadDone(bool ok) { - if (!ok) { - // Ending a connection on the other side results in the `ok` flag being set - // to false. It makes it impossible to detect a failure based just on the - // flag. We should manually check if the data we received is valid - this->terminate(grpc::Status::OK); - return; - } - this->nextWrite(); -} - -template -void ClientBidiReactorBase::terminate( - const grpc::Status &status) { - if (this->statusHolder->getStatus().ok()) { - this->statusHolder->setStatus(status); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - this->StartWritesDone(); - this->statusHolder->state = ReactorState::TERMINATED; -} - -template -void ClientBidiReactorBase::OnDone( - const grpc::Status &status) { - this->statusHolder->state = ReactorState::DONE; - this->terminate(status); - this->doneCallback(); -} - -template -std::shared_ptr -ClientBidiReactorBase::getStatusHolder() { - return this->statusHolder; -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h deleted file mode 100644 --- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h +++ /dev/null @@ -1,121 +0,0 @@ -#pragma once - -#include "BaseReactor.h" - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -// This is how this type of reactor works: -// - send a request to the server -// - read N responses from the server -// - terminate the connection -template -class ClientReadReactorBase : public grpc::ClientReadReactor, - public BaseReactor { - std::shared_ptr statusHolder = - std::make_shared(); - Response response; - -public: - Request request; - grpc::ClientContext context; - - // this should be called explicitly right after the reactor is created - void start(); - - // these methods come from the BaseReactor(go there for more information) - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - std::shared_ptr getStatusHolder() override; - - // these methods come from gRPC - // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 - void OnReadDone(bool ok) override; - void terminate(const grpc::Status &status) override; - void OnDone(const grpc::Status &status) override; - - // - argument response - response from the server that was read during the - // current cycle - // - returns status - if the connection is about to be - // continued, nullptr should be returned. Any other returned value will - // terminate the connection with a given status - virtual std::unique_ptr readResponse(Response &response) = 0; -}; - -template -void ClientReadReactorBase::start() { - if (this->statusHolder->state != ReactorState::NONE) { - return; - } - this->StartRead(&this->response); - if (this->statusHolder->state != ReactorState::RUNNING) { - this->StartCall(); - this->statusHolder->state = ReactorState::RUNNING; - } -} - -template -void ClientReadReactorBase::OnReadDone(bool ok) { - if (!ok) { - // Ending a connection on the other side results in the `ok` flag being set - // to false. It makes it impossible to detect a failure based just on the - // flag. We should manually check if the data we received is valid - this->terminate(grpc::Status::OK); - return; - } - try { - std::unique_ptr status = this->readResponse(this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - } catch (std::runtime_error &e) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - this->StartRead(&this->response); -} - -template -void ClientReadReactorBase::terminate( - const grpc::Status &status) { - if (this->statusHolder->getStatus().ok()) { - this->statusHolder->setStatus(status); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - this->statusHolder->state = ReactorState::TERMINATED; -} - -template -void ClientReadReactorBase::OnDone( - const grpc::Status &status) { - this->statusHolder->state = ReactorState::DONE; - this->terminate(status); - this->doneCallback(); -} - -template -std::shared_ptr -ClientReadReactorBase::getStatusHolder() { - return this->statusHolder; -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h deleted file mode 100644 --- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h +++ /dev/null @@ -1,129 +0,0 @@ -#pragma once - -#include "BaseReactor.h" - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -// This is how this type of reactor works: -// - write N requests to the server -// - terminate the connection -template -class ClientWriteReactorBase : public grpc::ClientWriteReactor, - public BaseReactor { - std::shared_ptr statusHolder = - std::make_shared(); - Request request; - bool initialized = false; - - void nextWrite(); - -public: - Response response; - grpc::ClientContext context; - - // this should be called explicitly right after the reactor is created - void start(); - - // these methods come from the BaseReactor(go there for more information) - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - std::shared_ptr getStatusHolder() override; - - // these methods come from gRPC - // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 - void OnWriteDone(bool ok) override; - void terminate(const grpc::Status &status) override; - void OnDone(const grpc::Status &status) override; - - // - argument request - request that should be edited and is going to be sent - // in the current cycle to the server - // - returns status - if the connection is about to be - // continued, nullptr should be returned. Any other returned value will - // terminate the connection with a given status - virtual std::unique_ptr prepareRequest(Request &request) = 0; -}; - -template -void ClientWriteReactorBase::nextWrite() { - this->request = Request(); - try { - std::unique_ptr status = this->prepareRequest(this->request); - if (status != nullptr) { - this->terminate(*status); - return; - } - } catch (std::runtime_error &e) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - this->StartWrite(&this->request); - if (!this->initialized) { - this->StartCall(); - this->initialized = true; - } -} - -template -void ClientWriteReactorBase::start() { - throw std::runtime_error("this class has not been tested"); - if (this->statusHolder->state != ReactorState::NONE) { - return; - } - this->statusHolder->state = ReactorState::RUNNING; - this->nextWrite(); -} - -template -void ClientWriteReactorBase::OnWriteDone(bool ok) { - if (!ok) { - this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); - return; - } - this->nextWrite(); -} - -template -void ClientWriteReactorBase::terminate( - const grpc::Status &status) { - if (this->statusHolder->getStatus().ok()) { - this->statusHolder->setStatus(status); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - this->statusHolder->state = ReactorState::TERMINATED; - this->StartWritesDone(); -} - -template -void ClientWriteReactorBase::OnDone( - const grpc::Status &status) { - this->statusHolder->state = ReactorState::DONE; - this->terminate(status); - this->doneCallback(); -} - -template -std::shared_ptr -ClientWriteReactorBase::getStatusHolder() { - return this->statusHolder; -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/server-base-reactors/CMakeLists.txt b/services/lib/src/server-base-reactors/CMakeLists.txt deleted file mode 100644 --- a/services/lib/src/server-base-reactors/CMakeLists.txt +++ /dev/null @@ -1,42 +0,0 @@ -project(comm-server-base-reactors CXX C) -cmake_minimum_required(VERSION 3.10) - -include(GNUInstallDirs) - -file(GLOB SERVER_HDRS - ${CMAKE_CURRENT_SOURCE_DIR}/*.h -) - -add_library(comm-server-base-reactors - INTERFACE # There's nothing to "build" with headers, so just export them -) - -target_include_directories(comm-server-base-reactors - INTERFACE - $ - $ -) - -install(TARGETS comm-server-base-reactors - EXPORT comm-server-base-reactors-export - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - COMPONENT comm-server-base-reactors - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - COMPONENT comm-server-base-reactors - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} - COMPONENT comm-server-base-reactors -) - -install(FILES ${COMMON_HDRS} DESTINATION include) - -set(_pname ${PROJECT_NAME}) -export(TARGETS comm-server-base-reactors - NAMESPACE comm-server-base-reactors:: - FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake -) - -install(EXPORT comm-server-base-reactors-export - FILE comm-server-base-reactors-targets.cmake - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-server-base-reactors - NAMESPACE comm-server-base-reactors:: -) diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h deleted file mode 100644 --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ /dev/null @@ -1,212 +0,0 @@ -#pragma once - -#include "BaseReactor.h" -#include "ThreadPool.h" - -#include - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -struct ServerBidiReactorStatus { - grpc::Status status; - bool sendLastResponse; - ServerBidiReactorStatus( - grpc::Status status = grpc::Status::OK, - bool sendLastResponse = false) - : status(status), sendLastResponse(sendLastResponse) { - } -}; - -// This is how this type of reactor works: -// - repeat: -// - read a request from the client -// - write a response to the client -// - terminate the connection -template -class ServerBidiReactorBase : public grpc::ServerBidiReactor, - public BaseReactor { - std::shared_ptr statusHolder = - std::make_shared(); - - std::atomic ongoingPoolTaskCounter{0}; - - Request request; - Response response; - - void beginPoolTask(); - void finishPoolTask(); - -protected: - ServerBidiReactorStatus status; - bool readingAborted = false; - -public: - ServerBidiReactorBase(); - - // these methods come from the BaseReactor(go there for more information) - void terminate(const grpc::Status &status) override; - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - std::shared_ptr getStatusHolder() override; - - // these methods come from gRPC - // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 - void OnDone() override; - void OnReadDone(bool ok) override; - void OnWriteDone(bool ok) override; - - void terminate(ServerBidiReactorStatus status); - ServerBidiReactorStatus getStatus() const; - void setStatus(const ServerBidiReactorStatus &status); - - // - argument request - request that was sent by the client and received by - // the server in the current cycle - // - argument response - response that will be sent to the client in the - // current cycle - // - returns status - if the connection is about to be - // continued, nullptr should be returned. Any other returned value will - // terminate the connection with a given status - virtual std::unique_ptr - handleRequest(Request request, Response *response) = 0; -}; - -template -ServerBidiReactorBase::ServerBidiReactorBase() { - this->statusHolder->state = ReactorState::RUNNING; - this->StartRead(&this->request); -} - -template -void ServerBidiReactorBase::terminate( - const grpc::Status &status) { - this->terminate(ServerBidiReactorStatus(status)); -} - -template -void ServerBidiReactorBase::OnDone() { - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - this->statusHolder->state = ReactorState::DONE; - this->doneCallback(); - }, - [this](std::unique_ptr err) { this->finishPoolTask(); }); -} - -template -void ServerBidiReactorBase::terminate( - ServerBidiReactorStatus status) { - this->setStatus(status); - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - this->terminateCallback(); - this->validate(); - }, - [this](std::unique_ptr err) { - if (err != nullptr) { - this->setStatus(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err)))); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - this->finishPoolTask(); - return; - } - if (this->getStatus().sendLastResponse) { - this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->getStatus().status); - } else { - this->Finish(this->getStatus().status); - } - this->statusHolder->state = ReactorState::TERMINATED; - this->finishPoolTask(); - }); -} - -template -ServerBidiReactorStatus -ServerBidiReactorBase::getStatus() const { - return this->status; -} - -template -void ServerBidiReactorBase::setStatus( - const ServerBidiReactorStatus &status) { - this->status = status; -} - -template -void ServerBidiReactorBase::OnReadDone(bool ok) { - if (!ok) { - this->readingAborted = true; - // Ending a connection on the other side results in the `ok` flag being set - // to false. It makes it impossible to detect a failure based just on the - // flag. We should manually check if the data we received is valid - this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); - return; - } - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - this->response = Response(); - std::unique_ptr status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - }, - [this](std::unique_ptr err) { - if (err != nullptr) { - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, *err))); - } - this->finishPoolTask(); - }); -} - -template -void ServerBidiReactorBase::OnWriteDone(bool ok) { - if (!ok) { - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); - return; - } - this->StartRead(&this->request); -} - -template -std::shared_ptr -ServerBidiReactorBase::getStatusHolder() { - return this->statusHolder; -} - -template -void ServerBidiReactorBase::beginPoolTask() { - this->ongoingPoolTaskCounter++; -} - -template -void ServerBidiReactorBase::finishPoolTask() { - this->ongoingPoolTaskCounter--; - if (!this->ongoingPoolTaskCounter.load() && - this->statusHolder->state == ReactorState::DONE) { - // This looks weird but apparently it is okay to do this. More - // information: - // https://phab.comm.dev/D3246#87890 - delete this; - } -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h deleted file mode 100644 --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ /dev/null @@ -1,156 +0,0 @@ -#pragma once - -#include "BaseReactor.h" -#include "ThreadPool.h" - -#include -#include - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -// This is how this type of reactor works: -// - read N requests from the client -// - write a final response to the client (may be empty) -// - terminate the connection -template -class ServerReadReactorBase : public grpc::ServerReadReactor, - public BaseReactor { - std::shared_ptr statusHolder = - std::make_shared(); - - std::atomic ongoingPoolTaskCounter{0}; - Request request; - - void beginPoolTask(); - void finishPoolTask(); - -protected: - Response *response; - -public: - ServerReadReactorBase(Response *response); - - // these methods come from the BaseReactor(go there for more information) - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - std::shared_ptr getStatusHolder() override; - - // these methods come from gRPC - // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 - void OnReadDone(bool ok) override; - void terminate(const grpc::Status &status) override; - void OnDone() override; - - // - argument request - data read from the client in the current cycle - // - returns status - if the connection is about to be - // continued, nullptr should be returned. Any other returned value will - // terminate the connection with a given status - virtual std::unique_ptr readRequest(Request request) = 0; -}; - -template -ServerReadReactorBase::ServerReadReactorBase( - Response *response) - : response(response) { - this->statusHolder->state = ReactorState::RUNNING; - this->StartRead(&this->request); -} - -template -void ServerReadReactorBase::OnReadDone(bool ok) { - if (!ok) { - // Ending a connection on the other side results in the `ok` flag being set - // to false. It makes it impossible to detect a failure based just on the - // flag. We should manually check if the data we received is valid - this->terminate(grpc::Status::OK); - return; - } - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - std::unique_ptr status = this->readRequest(this->request); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartRead(&this->request); - }, - [this](std::unique_ptr err) { - if (err != nullptr) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); - } - this->finishPoolTask(); - }); -} - -template -void ServerReadReactorBase::terminate( - const grpc::Status &status) { - this->statusHolder->setStatus(status); - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - this->terminateCallback(); - this->validate(); - }, - [this](std::unique_ptr err) { - if (err != nullptr) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, *err)); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state == ReactorState::RUNNING) { - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; - } - this->finishPoolTask(); - }); -} - -template -void ServerReadReactorBase::OnDone() { - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - this->statusHolder->state = ReactorState::DONE; - this->doneCallback(); - }, - [this](std::unique_ptr err) { this->finishPoolTask(); }); -} - -template -std::shared_ptr -ServerReadReactorBase::getStatusHolder() { - return this->statusHolder; -} - -template -void ServerReadReactorBase::beginPoolTask() { - this->ongoingPoolTaskCounter++; -} - -template -void ServerReadReactorBase::finishPoolTask() { - this->ongoingPoolTaskCounter--; - if (!this->ongoingPoolTaskCounter.load() && - this->statusHolder->state == ReactorState::DONE) { - // This looks weird but apparently it is okay to do this. More - // information: - // https://phab.comm.dev/D3246#87890 - delete this; - } -} - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h deleted file mode 100644 --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ /dev/null @@ -1,178 +0,0 @@ -#pragma once - -#include "BaseReactor.h" -#include "ThreadPool.h" - -#include -#include - -#include -#include -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -// This is how this type of reactor works: -// - read a request from the client -// - write N responses to the client -// - terminate the connection -template -class ServerWriteReactorBase : public grpc::ServerWriteReactor, - public BaseReactor { - std::shared_ptr statusHolder = - std::make_shared(); - - std::atomic ongoingPoolTaskCounter{0}; - Response response; - bool initialized = false; - - void nextWrite(); - void beginPoolTask(); - void finishPoolTask(); - -protected: - // this is a const ref since it's not meant to be modified - const Request &request; - -public: - ServerWriteReactorBase(const Request *request); - - // this should be called explicitly right after the reactor is created - void start(); - - // these methods come from the BaseReactor(go there for more information) - void validate() override{}; - void doneCallback() override{}; - void terminateCallback() override{}; - std::shared_ptr getStatusHolder() override; - - // these methods come from gRPC - // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 - virtual void initialize(){}; - void OnWriteDone(bool ok) override; - void terminate(const grpc::Status &status) override; - void OnDone() override; - - // - argument response - should be filled with data that will be sent to the - // client in the current cycle - // - returns status - if the connection is about to be - // continued, nullptr should be returned. Any other returned value will - // terminate the connection with a given status - virtual std::unique_ptr writeResponse(Response *response) = 0; -}; - -template -void ServerWriteReactorBase::terminate( - const grpc::Status &status) { - this->statusHolder->setStatus(status); - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - this->terminateCallback(); - this->validate(); - }, - [this](std::unique_ptr err) { - if (err != nullptr) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, *err)); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state == ReactorState::RUNNING) { - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; - } - this->finishPoolTask(); - }); -} - -template -ServerWriteReactorBase::ServerWriteReactorBase( - const Request *request) - : request(*request) { - // we cannot call this->start() here because it's going to call it on - // the base class, not derived leading to the runtime error of calling - // a pure virtual function - // start has to be exposed as a public function and called explicitly - // to initialize writing -} - -template -void ServerWriteReactorBase::nextWrite() { - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { - if (!this->initialized) { - this->initialize(); - this->initialized = true; - } - this->response = Response(); - std::unique_ptr status = - this->writeResponse(&this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - }, - [this](std::unique_ptr err) { - if (err != nullptr) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); - } - this->finishPoolTask(); - }); -} - -template -void ServerWriteReactorBase::start() { - this->statusHolder->state = ReactorState::RUNNING; - this->nextWrite(); -} - -template -void ServerWriteReactorBase::OnDone() { - this->beginPoolTask(); - ThreadPool::getInstance().scheduleWithCallback( - [this]() { this->doneCallback(); }, - [this](std::unique_ptr err) { this->finishPoolTask(); }); -} - -template -std::shared_ptr -ServerWriteReactorBase::getStatusHolder() { - return this->statusHolder; -} - -template -void ServerWriteReactorBase::OnWriteDone(bool ok) { - if (!ok) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); - return; - } - this->nextWrite(); -} - -template -void ServerWriteReactorBase::beginPoolTask() { - this->ongoingPoolTaskCounter++; -} - -template -void ServerWriteReactorBase::finishPoolTask() { - this->ongoingPoolTaskCounter--; - if (!this->ongoingPoolTaskCounter.load() && - this->statusHolder->state == ReactorState::DONE) { - // This looks weird but apparently it is okay to do this. More - // information: - // https://phab.comm.dev/D3246#87890 - delete this; - } -} - -} // namespace reactor -} // namespace network -} // namespace comm