Page MenuHomePhorge

D8244.1767141641.diff
No OneTemporary

Size
57 KB
Referenced Files
None
Subscribers
None

D8244.1767141641.diff

diff --git a/services/lib/cmake-components/folly.cmake b/services/lib/cmake-components/folly.cmake
deleted file mode 100644
--- a/services/lib/cmake-components/folly.cmake
+++ /dev/null
@@ -1,54 +0,0 @@
-add_definitions(
- -DFOLLY_NO_CONFIG=1
- -DFOLLY_HAVE_CLOCK_GETTIME=1
- -DFOLLY_HAVE_MEMRCHR=1
- -DFOLLY_USE_LIBCPP=0
- -DFOLLY_MOBILE=0
-)
-
-set(
- FOLLY_SOURCES
-
- ./lib/folly/folly/detail/Futex.cpp
- ./lib/folly/folly/synchronization/ParkingLot.cpp
- ./lib/folly/folly/lang/SafeAssert.cpp
- ./lib/folly/folly/FileUtil.cpp
- ./lib/folly/folly/Subprocess.cpp
- ./lib/folly/folly/File.cpp
- ./lib/folly/folly/Format.cpp
- ./lib/folly/folly/Conv.cpp
- ./lib/folly/folly/io/IOBuf.cpp
- ./lib/folly/folly/memory/detail/MallocImpl.cpp
- ./lib/folly/folly/ScopeGuard.cpp
- ./lib/folly/folly/hash/SpookyHashV2.cpp
- ./lib/folly/folly/io/IOBufQueue.cpp
- ./lib/folly/folly/lang/Assume.cpp
- ./lib/folly/folly/String.cpp
- ./lib/folly/folly/portability/SysUio.cpp
- ./lib/folly/folly/net/NetOps.cpp
- ./lib/folly/folly/synchronization/Hazptr.cpp
- ./lib/folly/folly/detail/ThreadLocalDetail.cpp
- ./lib/folly/folly/SharedMutex.cpp
- ./lib/folly/folly/concurrency/CacheLocality.cpp
- ./lib/folly/folly/detail/StaticSingletonManager.cpp
- ./lib/folly/folly/executors/ThreadPoolExecutor.cpp
- ./lib/folly/folly/executors/GlobalThreadPoolList.cpp
- ./lib/folly/folly/Demangle.cpp
- ./lib/folly/folly/synchronization/AsymmetricMemoryBarrier.cpp
- ./lib/folly/folly/io/async/Request.cpp
- ./lib/folly/folly/detail/MemoryIdler.cpp
- ./lib/folly/folly/detail/AtFork.cpp
- ./lib/folly/folly/Executor.cpp
- ./lib/folly/folly/lang/CString.cpp
- ./lib/folly/folly/portability/SysMembarrier.cpp
- ./lib/folly/folly/container/detail/F14Table.cpp
- ./lib/folly/folly/detail/UniqueInstance.cpp
- ./lib/folly/folly/executors/QueuedImmediateExecutor.cpp
- ./lib/folly/folly/memory/MallctlHelper.cpp
-)
-
-set(
- FOLLY_INCLUDES
-
- ./lib/folly
-)
diff --git a/services/lib/cmake-components/grpc.cmake b/services/lib/cmake-components/grpc.cmake
deleted file mode 100644
--- a/services/lib/cmake-components/grpc.cmake
+++ /dev/null
@@ -1,25 +0,0 @@
-# Disable naming conventions, as the variable names are determined by upstream
-# cmake-lint: disable=C0103
-
-# protobuf
-set(protobuf_MODULE_COMPATIBLE TRUE)
-find_package(Protobuf CONFIG REQUIRED)
-message(STATUS "Using protobuf ${Protobuf_VERSION}")
-
-set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
-set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
-
-# Find gRPC installation
-find_package(gRPC CONFIG REQUIRED)
-message(STATUS "Using gRPC ${gRPC_VERSION}")
-
-set(_GRPC_GRPCPP gRPC::grpc++)
-set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
-
-set(
- GRPC_LIBS
-
- ${_GRPC_GRPCPP}
- ${_PROTOBUF_LIBPROTOBUF}
- gRPC::grpc++_reflection
-)
diff --git a/services/lib/docker/build_service.sh b/services/lib/docker/build_service.sh
deleted file mode 100755
--- a/services/lib/docker/build_service.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-# Allow scripts to be called from anywhere
-SCRIPT_DIR=$(cd "$(dirname "$0")"; pwd -P)
-
-# folly hack - https://github.com/facebook/folly/pull/1231
-sed -i 's/#if __has_include(<demangle.h>)/#if __has_include(<Demangle.h>)/g' \
- /usr/local/include/folly/detail/Demangle.h
-
-rm -rf cmake/build
-mkdir -p cmake/build
-
-"${SCRIPT_DIR}"/build_sources.sh
diff --git a/services/lib/docker/build_sources.sh b/services/lib/docker/build_sources.sh
deleted file mode 100755
--- a/services/lib/docker/build_sources.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-NPROC=0
-
-NPROC=$(nproc 2> /dev/null || echo 1)
-if [[ $NPROC -eq 1 ]]; then
- NPROC=$(sysctl -n hw.physicalcpu 2> /dev/null || echo 1)
-fi
-
-echo "building the server (nproc=$NPROC)..."
-
-pushd cmake/build
-# gtest is not installed, avoid building test suites
-cmake ../.. -DBUILD_TESTING=OFF
-make -j "$NPROC"
-
-popd
-
-echo "success - server built"
diff --git a/services/lib/docker/install_corrosion.sh b/services/lib/docker/install_corrosion.sh
deleted file mode 100755
--- a/services/lib/docker/install_corrosion.sh
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/usr/bin/env bash
-set -e
-
-cd /tmp
-
-git clone --recurse-submodules -b v0.2.1 --single-branch https://github.com/corrosion-rs/corrosion.git
-pushd corrosion
-cmake -S. -Bbuild -DCMAKE_BUILD_TYPE=Release
-cmake --build build --config Release
-cmake --install build --config Release
-
-popd # corrosion
-rm -rf corrosion
diff --git a/services/lib/docker/proto_codegen.sh b/services/lib/docker/proto_codegen.sh
deleted file mode 100755
--- a/services/lib/docker/proto_codegen.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-echo "generating files from protos..."
-
-for PROTO_FILE in ./protos/*; do
- protoc -I=./protos --cpp_out=_generated --grpc_out=_generated --plugin=protoc-gen-grpc="$(which grpc_cpp_plugin)" ./"$PROTO_FILE"
-done
-
-echo "success - code generated from protos"
diff --git a/services/lib/docker/run_service.sh b/services/lib/docker/run_service.sh
deleted file mode 100755
--- a/services/lib/docker/run_service.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-EXE_PATH="./cmake/build/bin"
-
-EXE=$(find $EXE_PATH -mindepth 1 -maxdepth 1 -not -path '*/.*')
-EXES=$(wc -l <<< "$EXE")
-
-if [[ $EXES -ne 1 ]]; then
- echo "there should be exactly one executable of a service, $EXES found";
- exit 1;
-fi
-
-"$EXE"
diff --git a/services/lib/docker/run_tests.sh b/services/lib/docker/run_tests.sh
deleted file mode 100755
--- a/services/lib/docker/run_tests.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-pushd cmake/build
-make test ARGS="-V"
-popd # cmake/build
diff --git a/services/lib/src/BaseReactor.h b/services/lib/src/BaseReactor.h
deleted file mode 100644
--- a/services/lib/src/BaseReactor.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-
-#include "ReactorStatusHolder.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class BaseReactor {
-public:
- // Returns a status holder that consists of:
- // - reactor's state,
- // - status of the operation that's being performed by this reactor.
- virtual std::shared_ptr<ReactorStatusHolder> getStatusHolder() = 0;
- // Should be called when we plan to terminate the connection for some reason
- // either with a success or a failure status.
- // Receives a status indicating a result of the reactor's operation.
- virtual void terminate(const grpc::Status &status) = 0;
- // Validates current values of the reactor's fields.
- virtual void validate() = 0;
- // Should be called when `OnDone` is called. gRPC calls `OnDone` when there
- // are not going to be more rpc operations.
- virtual void doneCallback() = 0;
- // Should be called when `terminate` is called.
- virtual void terminateCallback() = 0;
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/CMakeLists.txt b/services/lib/src/CMakeLists.txt
deleted file mode 100644
--- a/services/lib/src/CMakeLists.txt
+++ /dev/null
@@ -1,62 +0,0 @@
-project(comm-services-common)
-cmake_minimum_required(VERSION 3.4)
-set(CMAKE_CXX_STANDARD 14)
-
-include(GNUInstallDirs)
-
-# Export reactors
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/client-base-reactors)
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/server-base-reactors)
-
-file(GLOB COMMON_HDRS
- ${CMAKE_CURRENT_SOURCE_DIR}/*.h
-)
-
-file(GLOB COMMON_SRCS
- ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
-)
-
-add_library(comm-services-common
- ${COMMON_HDRS}
- ${COMMON_SRCS}
-)
-
-find_package(AWSSDK REQUIRED COMPONENTS core dynamodb)
-find_package(Boost 1.40 COMPONENTS program_options REQUIRED)
-find_package(Protobuf REQUIRED)
-find_package(glog REQUIRED)
-find_package(gRPC REQUIRED)
-
-target_link_libraries(comm-services-common
- glog::glog
- gRPC::grpc++
- ${AWSSDK_LINK_LIBRARIES}
- ${Boost_LIBRARIES}
-)
-
-target_include_directories(comm-services-common
- PUBLIC
- $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
- $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
-)
-
-install(TARGETS comm-services-common EXPORT comm-services-common-export
- RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT comm-services-common
- LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT comm-services-common
- ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT comm-services-common
-)
-
-install(FILES ${COMMON_HDRS} DESTINATION include)
-
-set(_pname ${PROJECT_NAME})
-export(TARGETS comm-services-common
- NAMESPACE comm-services-common::
- FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake
-)
-
-# For installation
-install(EXPORT comm-services-common-export
- FILE comm-services-common-targets.cmake
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-services-common
- NAMESPACE comm-services-common::
-)
diff --git a/services/lib/src/DatabaseEntitiesTools.h b/services/lib/src/DatabaseEntitiesTools.h
deleted file mode 100644
--- a/services/lib/src/DatabaseEntitiesTools.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-
-#include "Item.h"
-
-#include <memory>
-#include <type_traits>
-
-namespace comm {
-namespace network {
-namespace database {
-
-template <typename T> std::shared_ptr<T> createItemByType() {
- static_assert(std::is_base_of<Item, T>::value, "T must inherit from Item");
- return std::make_shared<T>();
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h
deleted file mode 100644
--- a/services/lib/src/DatabaseManagerBase.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#pragma once
-
-#include "DatabaseEntitiesTools.h"
-#include "DynamoDBTools.h"
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/model/GetItemRequest.h>
-#include <aws/dynamodb/model/PutItemRequest.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-namespace database {
-
-// this class should be thread-safe in case any shared resources appear
-class DatabaseManagerBase {
-protected:
- void innerPutItem(
- std::shared_ptr<Item> item,
- const Aws::DynamoDB::Model::PutItemRequest &request);
-
- template <typename T>
- std::shared_ptr<T>
- innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request);
-
- void innerRemoveItem(const Item &item);
- void innerBatchWriteItem(
- const std::string &tableName,
- const size_t &chunkSize,
- const size_t &backoffFirstRetryDelay,
- const size_t &maxBackoffTime,
- std::vector<Aws::DynamoDB::Model::WriteRequest> &writeRequests);
-};
-
-template <typename T>
-std::shared_ptr<T> DatabaseManagerBase::innerFindItem(
- Aws::DynamoDB::Model::GetItemRequest &request) {
- std::shared_ptr<T> item = createItemByType<T>();
- request.SetTableName(item->getTableName());
- const Aws::DynamoDB::Model::GetItemOutcome &outcome =
- getDynamoDBClient()->GetItem(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- const AttributeValues &outcomeItem = outcome.GetResult().GetItem();
- if (!outcomeItem.size()) {
- return nullptr;
- }
- item->assignItemFromDatabase(outcomeItem);
- return item;
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp
deleted file mode 100644
--- a/services/lib/src/DatabaseManagerBase.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-#include "DatabaseManagerBase.h"
-
-#include "Item.h"
-
-#include <aws/core/utils/Outcome.h>
-#include <aws/dynamodb/model/BatchWriteItemRequest.h>
-#include <aws/dynamodb/model/BatchWriteItemResult.h>
-#include <aws/dynamodb/model/DeleteItemRequest.h>
-#include <glog/logging.h>
-
-#include <chrono>
-#include <cmath>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace database {
-
-void DatabaseManagerBase::innerPutItem(
- std::shared_ptr<Item> item,
- const Aws::DynamoDB::Model::PutItemRequest &request) {
- const Aws::DynamoDB::Model::PutItemOutcome outcome =
- getDynamoDBClient()->PutItem(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-}
-
-void DatabaseManagerBase::innerRemoveItem(const Item &item) {
- Aws::DynamoDB::Model::DeleteItemRequest request;
- request.SetTableName(item.getTableName());
- PrimaryKeyDescriptor pk = item.getPrimaryKeyDescriptor();
- PrimaryKeyValue primaryKeyValue = item.getPrimaryKeyValue();
- request.AddKey(
- pk.partitionKey,
- Aws::DynamoDB::Model::AttributeValue(primaryKeyValue.partitionKey));
- if (pk.sortKey != nullptr && primaryKeyValue.sortKey != nullptr) {
- request.AddKey(
- *pk.sortKey,
- Aws::DynamoDB::Model::AttributeValue(*primaryKeyValue.sortKey));
- }
-
- const Aws::DynamoDB::Model::DeleteItemOutcome &outcome =
- getDynamoDBClient()->DeleteItem(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-}
-
-void DatabaseManagerBase::innerBatchWriteItem(
- const std::string &tableName,
- const size_t &chunkSize,
- const size_t &backoffFirstRetryDelay,
- const size_t &maxBackoffTime,
- std::vector<Aws::DynamoDB::Model::WriteRequest> &writeRequests) {
- // Split write requests to chunks by chunkSize size and write
- // them by batch
- Aws::DynamoDB::Model::BatchWriteItemOutcome outcome;
- std::vector<Aws::DynamoDB::Model::WriteRequest> writeRequestsChunk;
- std::vector<Aws::DynamoDB::Model::WriteRequest>::iterator chunkPositionStart,
- chunkPositionEnd;
- for (size_t i = 0; i < writeRequests.size(); i += chunkSize) {
- chunkPositionStart = writeRequests.begin() + i;
- chunkPositionEnd =
- writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize);
- writeRequestsChunk = std::vector<Aws::DynamoDB::Model::WriteRequest>(
- chunkPositionStart, chunkPositionEnd);
-
- Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest;
- writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk);
- outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-
- size_t delayRetry, delayMs, jitterMs;
- while (!outcome.GetResult().GetUnprocessedItems().empty()) {
- if (delayMs == maxBackoffTime) {
- throw std::runtime_error(
- "InnerBatchWriteItem error: maximum wait time to put unprocessed "
- "items to DynamoDB is exceeded.");
- }
- jitterMs = std::rand() % 99 + 1;
- delayRetry++;
- delayMs = std::min(
- size_t(backoffFirstRetryDelay * std::pow(2, delayRetry) + jitterMs),
- maxBackoffTime);
- LOG(INFO) << "Waiting for a backoff " << delayMs
- << "ms delay before putting unprocessed items from batch write "
- "to DynamoDB";
- std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
- writeBatchRequest.SetRequestItems(
- outcome.GetResult().GetUnprocessedItems());
- outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- }
- }
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DynamoDBTools.h b/services/lib/src/DynamoDBTools.h
deleted file mode 100644
--- a/services/lib/src/DynamoDBTools.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/DynamoDBClient.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-
-std::unique_ptr<Aws::DynamoDB::DynamoDBClient> getDynamoDBClient();
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DynamoDBTools.cpp b/services/lib/src/DynamoDBTools.cpp
deleted file mode 100644
--- a/services/lib/src/DynamoDBTools.cpp
+++ /dev/null
@@ -1,19 +0,0 @@
-#include "DynamoDBTools.h"
-#include "GlobalConstants.h"
-#include "GlobalTools.h"
-
-namespace comm {
-namespace network {
-
-std::unique_ptr<Aws::DynamoDB::DynamoDBClient> getDynamoDBClient() {
- Aws::Client::ClientConfiguration config;
- config.region = AWS_REGION;
- if (tools::isSandbox()) {
- config.endpointOverride = Aws::String("localstack:4566");
- config.scheme = Aws::Http::Scheme::HTTP;
- }
- return std::make_unique<Aws::DynamoDB::DynamoDBClient>(config);
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/GlobalConstants.h b/services/lib/src/GlobalConstants.h
deleted file mode 100644
--- a/services/lib/src/GlobalConstants.h
+++ /dev/null
@@ -1,31 +0,0 @@
-#pragma once
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-// 4MB limit
-// WARNING: use keeping in mind that grpc adds its own headers to messages
-// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
-// so the message that actually is being sent over the network looks like this
-// [Compressed-Flag] [Message-Length] [Message]
-// [Compressed-Flag] 1 byte - added by grpc
-// [Message-Length] 4 bytes - added by grpc
-// [Message] N bytes - actual data
-// so for every message we get 5 additional bytes of data
-// as mentioned here
-// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671
-// grpc stream may contain more than one message
-const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024;
-const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5;
-
-const std::string AWS_REGION = "us-east-2";
-
-const char ATTACHMENT_DELIMITER = ';';
-
-// gRPC Server
-const std::string SERVER_LISTEN_ADDRESS = "0.0.0.0:50051";
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/GlobalTools.h b/services/lib/src/GlobalTools.h
deleted file mode 100644
--- a/services/lib/src/GlobalTools.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-
-#include <cstdint>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace tools {
-
-const std::string ID_SEPARATOR = ":";
-
-uint64_t getCurrentTimestamp();
-
-bool hasEnvFlag(const std::string &flag);
-
-size_t getNumberOfCores();
-
-std::string decorateTableName(const std::string &baseName);
-
-bool isSandbox();
-
-std::string generateUUID();
-
-bool validateUUIDv4(const std::string &uuid);
-
-void InitLogging(const std::string &programName);
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/GlobalTools.cpp b/services/lib/src/GlobalTools.cpp
deleted file mode 100644
--- a/services/lib/src/GlobalTools.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-#include "GlobalTools.h"
-
-#include <glog/logging.h>
-#include <openssl/sha.h>
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
-
-#include <algorithm>
-#include <chrono>
-#include <iomanip>
-#include <regex>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace tools {
-
-uint64_t getCurrentTimestamp() {
- using namespace std::chrono;
- return duration_cast<milliseconds>(system_clock::now().time_since_epoch())
- .count();
-}
-
-bool hasEnvFlag(const std::string &flag) {
- if (std::getenv(flag.c_str()) == nullptr) {
- return false;
- }
- return std::string(std::getenv(flag.c_str())) == "1";
-}
-
-size_t getNumberOfCores() {
- return (size_t)std::max(1u, std::thread::hardware_concurrency());
-}
-
-std::string decorateTableName(const std::string &baseName) {
- std::string suffix = "";
- if (hasEnvFlag("COMM_TEST_SERVICES")) {
- suffix = "-test";
- }
- return baseName + suffix;
-}
-
-bool isSandbox() {
- return hasEnvFlag("COMM_SERVICES_SANDBOX");
-}
-
-std::string generateUUID() {
- thread_local boost::uuids::random_generator random_generator;
- return boost::uuids::to_string(random_generator());
-}
-
-bool validateUUIDv4(const std::string &uuid) {
- const std::regex uuidV4RegexFormat(
- "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$");
- try {
- return std::regex_match(uuid, uuidV4RegexFormat);
- } catch (const std::exception &e) {
- LOG(ERROR) << "Tools: "
- << "Got an exception at `validateUUID`: " << e.what();
- return false;
- }
-}
-
-void InitLogging(const std::string &programName) {
- FLAGS_logtostderr = true;
- FLAGS_colorlogtostderr = true;
- if (comm::network::tools::isSandbox()) {
- // Log levels INFO, WARNING, ERROR, FATAL are 0, 1, 2, 3, respectively
- FLAGS_minloglevel = 0;
- } else {
- FLAGS_minloglevel = 1;
- }
- google::InitGoogleLogging(programName.c_str());
-}
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/Item.h b/services/lib/src/Item.h
deleted file mode 100644
--- a/services/lib/src/Item.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/DynamoDBClient.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-typedef Aws::Map<Aws::String, Aws::DynamoDB::Model::AttributeValue>
- AttributeValues;
-
-struct PrimaryKeyBase {
- PrimaryKeyBase(const std::string partitionKey)
- : partitionKey(partitionKey), sortKey(nullptr) {
- }
- PrimaryKeyBase(const std::string partitionKey, const std::string sortKey)
- : partitionKey(partitionKey),
- sortKey(std::make_unique<std::string>(sortKey)) {
- }
-
- const std::string partitionKey;
- std::unique_ptr<std::string> sortKey;
-};
-
-struct PrimaryKeyDescriptor : PrimaryKeyBase {
- using PrimaryKeyBase::PrimaryKeyBase;
-};
-
-struct PrimaryKeyValue : PrimaryKeyBase {
- using PrimaryKeyBase::PrimaryKeyBase;
-};
-
-class Item {
- virtual void validate() const = 0;
-
-public:
- virtual std::string getTableName() const = 0;
- virtual PrimaryKeyDescriptor getPrimaryKeyDescriptor() const = 0;
- virtual PrimaryKeyValue getPrimaryKeyValue() const = 0;
- virtual void assignItemFromDatabase(const AttributeValues &itemFromDB) = 0;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/ReactorStatusHolder.h b/services/lib/src/ReactorStatusHolder.h
deleted file mode 100644
--- a/services/lib/src/ReactorStatusHolder.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#pragma once
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <mutex>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-enum class ReactorState {
- NONE = 0,
- RUNNING = 1,
- TERMINATED = 2,
- DONE = 3,
-};
-
-class ReactorStatusHolder {
-private:
- grpc::Status status = grpc::Status::OK;
- std::mutex statusAccessMutex;
-
-public:
- std::atomic<ReactorState> state = {ReactorState::NONE};
-
- grpc::Status getStatus() {
- const std::unique_lock<std::mutex> lock(this->statusAccessMutex);
- return this->status;
- }
- void setStatus(const grpc::Status &status) {
- const std::unique_lock<std::mutex> lock(this->statusAccessMutex);
- this->status = status;
- }
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/ThreadPool.h b/services/lib/src/ThreadPool.h
deleted file mode 100644
--- a/services/lib/src/ThreadPool.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include "GlobalTools.h"
-
-#include <boost/asio.hpp>
-#include <boost/asio/thread_pool.hpp>
-
-#include <atomic>
-#include <memory>
-
-typedef std::function<void()> Task;
-typedef std::function<void(std::unique_ptr<std::string>)> Callback;
-
-namespace comm {
-namespace network {
-
-class ThreadPool {
- boost::asio::thread_pool pool =
- boost::asio::thread_pool(tools::getNumberOfCores());
-
- ThreadPool() {
- }
-
- virtual ~ThreadPool() {
- }
-
-public:
- static ThreadPool &getInstance() {
- static ThreadPool instance;
- return instance;
- }
-
- void scheduleWithCallback(Task task, Callback callback) {
- boost::asio::post(this->pool, [task, callback]() {
- std::unique_ptr<std::string> err = nullptr;
- try {
- task();
- } catch (std::exception &e) {
- err = std::make_unique<std::string>(e.what());
- }
- callback(std::move(err));
- });
- }
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/client-base-reactors/CMakeLists.txt b/services/lib/src/client-base-reactors/CMakeLists.txt
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/CMakeLists.txt
+++ /dev/null
@@ -1,44 +0,0 @@
-project(comm-client-base-reactors CXX C)
-cmake_minimum_required(VERSION 3.10)
-
-include(GNUInstallDirs)
-
-set(CLIENT_HDRS
- ClientWriteReactorBase.h
- ClientBidiReactorBase.h
- ClientReadReactorBase.h
-)
-
-add_library(comm-client-base-reactors
- INTERFACE
-)
-
-target_include_directories(comm-client-base-reactors
- INTERFACE
- $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
- $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
-)
-
-install(TARGETS comm-client-base-reactors
- EXPORT comm-client-base-reactors-export
- RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
- COMPONENT comm-client-base-reactors
- LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-client-base-reactors
- ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-client-base-reactors
-)
-
-install(FILES ${COMMON_HDRS} DESTINATION include)
-
-set(_pname ${PROJECT_NAME})
-export(TARGETS comm-client-base-reactors
- NAMESPACE comm-client-base-reactors::
- FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake
-)
-
-install(EXPORT comm-client-base-reactors-export
- FILE comm-client-base-reactors-targets.cmake
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-client-base-reactors
- NAMESPACE comm-client-base-reactors::
-)
diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h
+++ /dev/null
@@ -1,144 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - repeat:
-// - write a request to the server
-// - read a response from the server
-// - terminate the connection
-template <class Request, class Response>
-class ClientBidiReactorBase : public grpc::ClientBidiReactor<Request, Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
- std::shared_ptr<Response> response = nullptr;
- void nextWrite();
-
-protected:
- Request request;
-
-public:
- grpc::ClientContext context;
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnWriteDone(bool ok) override;
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone(const grpc::Status &status) override;
-
- // - argument request - request that's about to be prepared for the next cycle
- // - argument previousResponse - response received during the previous cycle
- // (may be nullptr)
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> prepareRequest(
- Request &request,
- std::shared_ptr<Response> previousResponse) = 0;
-};
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::nextWrite() {
- this->request = Request();
- try {
- std::unique_ptr<grpc::Status> status =
- this->prepareRequest(this->request, this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- return;
- }
- this->StartWrite(&this->request);
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::start() {
- if (this->statusHolder->state != ReactorState::NONE) {
- return;
- }
- this->statusHolder->state = ReactorState::RUNNING;
- this->nextWrite();
- this->StartCall();
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (this->response == nullptr) {
- this->response = std::make_shared<Response>();
- }
- this->StartRead(&(*this->response));
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->statusHolder->getStatus().ok()) {
- this->statusHolder->setStatus(status);
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->StartWritesDone();
- this->statusHolder->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::OnDone(
- const grpc::Status &status) {
- this->statusHolder->state = ReactorState::DONE;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ClientBidiReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h
+++ /dev/null
@@ -1,121 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - send a request to the server
-// - read N responses from the server
-// - terminate the connection
-template <class Request, class Response>
-class ClientReadReactorBase : public grpc::ClientReadReactor<Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
- Response response;
-
-public:
- Request request;
- grpc::ClientContext context;
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone(const grpc::Status &status) override;
-
- // - argument response - response from the server that was read during the
- // current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> readResponse(Response &response) = 0;
-};
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::start() {
- if (this->statusHolder->state != ReactorState::NONE) {
- return;
- }
- this->StartRead(&this->response);
- if (this->statusHolder->state != ReactorState::RUNNING) {
- this->StartCall();
- this->statusHolder->state = ReactorState::RUNNING;
- }
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- try {
- std::unique_ptr<grpc::Status> status = this->readResponse(this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->StartRead(&this->response);
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->statusHolder->getStatus().ok()) {
- this->statusHolder->setStatus(status);
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->statusHolder->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::OnDone(
- const grpc::Status &status) {
- this->statusHolder->state = ReactorState::DONE;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ClientReadReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h
+++ /dev/null
@@ -1,129 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - write N requests to the server
-// - terminate the connection
-template <class Request, class Response>
-class ClientWriteReactorBase : public grpc::ClientWriteReactor<Request>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
- Request request;
- bool initialized = false;
-
- void nextWrite();
-
-public:
- Response response;
- grpc::ClientContext context;
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone(const grpc::Status &status) override;
-
- // - argument request - request that should be edited and is going to be sent
- // in the current cycle to the server
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> prepareRequest(Request &request) = 0;
-};
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::nextWrite() {
- this->request = Request();
- try {
- std::unique_ptr<grpc::Status> status = this->prepareRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->StartWrite(&this->request);
- if (!this->initialized) {
- this->StartCall();
- this->initialized = true;
- }
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::start() {
- throw std::runtime_error("this class has not been tested");
- if (this->statusHolder->state != ReactorState::NONE) {
- return;
- }
- this->statusHolder->state = ReactorState::RUNNING;
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error"));
- return;
- }
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->statusHolder->getStatus().ok()) {
- this->statusHolder->setStatus(status);
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->statusHolder->state = ReactorState::TERMINATED;
- this->StartWritesDone();
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::OnDone(
- const grpc::Status &status) {
- this->statusHolder->state = ReactorState::DONE;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ClientWriteReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/server-base-reactors/CMakeLists.txt b/services/lib/src/server-base-reactors/CMakeLists.txt
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/CMakeLists.txt
+++ /dev/null
@@ -1,42 +0,0 @@
-project(comm-server-base-reactors CXX C)
-cmake_minimum_required(VERSION 3.10)
-
-include(GNUInstallDirs)
-
-file(GLOB SERVER_HDRS
- ${CMAKE_CURRENT_SOURCE_DIR}/*.h
-)
-
-add_library(comm-server-base-reactors
- INTERFACE # There's nothing to "build" with headers, so just export them
-)
-
-target_include_directories(comm-server-base-reactors
- INTERFACE
- $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
- $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
-)
-
-install(TARGETS comm-server-base-reactors
- EXPORT comm-server-base-reactors-export
- RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
- COMPONENT comm-server-base-reactors
- LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-server-base-reactors
- ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-server-base-reactors
-)
-
-install(FILES ${COMMON_HDRS} DESTINATION include)
-
-set(_pname ${PROJECT_NAME})
-export(TARGETS comm-server-base-reactors
- NAMESPACE comm-server-base-reactors::
- FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake
-)
-
-install(EXPORT comm-server-base-reactors-export
- FILE comm-server-base-reactors-targets.cmake
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-server-base-reactors
- NAMESPACE comm-server-base-reactors::
-)
diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
+++ /dev/null
@@ -1,212 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-#include "ThreadPool.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-struct ServerBidiReactorStatus {
- grpc::Status status;
- bool sendLastResponse;
- ServerBidiReactorStatus(
- grpc::Status status = grpc::Status::OK,
- bool sendLastResponse = false)
- : status(status), sendLastResponse(sendLastResponse) {
- }
-};
-
-// This is how this type of reactor works:
-// - repeat:
-// - read a request from the client
-// - write a response to the client
-// - terminate the connection
-template <class Request, class Response>
-class ServerBidiReactorBase : public grpc::ServerBidiReactor<Request, Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
-
- std::atomic<int> ongoingPoolTaskCounter{0};
-
- Request request;
- Response response;
-
- void beginPoolTask();
- void finishPoolTask();
-
-protected:
- ServerBidiReactorStatus status;
- bool readingAborted = false;
-
-public:
- ServerBidiReactorBase();
-
- // these methods come from the BaseReactor(go there for more information)
- void terminate(const grpc::Status &status) override;
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnDone() override;
- void OnReadDone(bool ok) override;
- void OnWriteDone(bool ok) override;
-
- void terminate(ServerBidiReactorStatus status);
- ServerBidiReactorStatus getStatus() const;
- void setStatus(const ServerBidiReactorStatus &status);
-
- // - argument request - request that was sent by the client and received by
- // the server in the current cycle
- // - argument response - response that will be sent to the client in the
- // current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<ServerBidiReactorStatus>
- handleRequest(Request request, Response *response) = 0;
-};
-
-template <class Request, class Response>
-ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() {
- this->statusHolder->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->terminate(ServerBidiReactorStatus(status));
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnDone() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->statusHolder->state = ReactorState::DONE;
- this->doneCallback();
- },
- [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- ServerBidiReactorStatus status) {
- this->setStatus(status);
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->terminateCallback();
- this->validate();
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->setStatus(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err))));
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- this->finishPoolTask();
- return;
- }
- if (this->getStatus().sendLastResponse) {
- this->StartWriteAndFinish(
- &this->response, grpc::WriteOptions(), this->getStatus().status);
- } else {
- this->Finish(this->getStatus().status);
- }
- this->statusHolder->state = ReactorState::TERMINATED;
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-ServerBidiReactorStatus
-ServerBidiReactorBase<Request, Response>::getStatus() const {
- return this->status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::setStatus(
- const ServerBidiReactorStatus &status) {
- this->status = status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- this->readingAborted = true;
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(ServerBidiReactorStatus(grpc::Status::OK));
- return;
- }
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->response = Response();
- std::unique_ptr<ServerBidiReactorStatus> status =
- this->handleRequest(this->request, &this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, *err)));
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::ABORTED, "write failed")));
- return;
- }
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerBidiReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::beginPoolTask() {
- this->ongoingPoolTaskCounter++;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::finishPoolTask() {
- this->ongoingPoolTaskCounter--;
- if (!this->ongoingPoolTaskCounter.load() &&
- this->statusHolder->state == ReactorState::DONE) {
- // This looks weird but apparently it is okay to do this. More
- // information:
- // https://phab.comm.dev/D3246#87890
- delete this;
- }
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h
+++ /dev/null
@@ -1,156 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-#include "ThreadPool.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - read N requests from the client
-// - write a final response to the client (may be empty)
-// - terminate the connection
-template <class Request, class Response>
-class ServerReadReactorBase : public grpc::ServerReadReactor<Request>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
-
- std::atomic<int> ongoingPoolTaskCounter{0};
- Request request;
-
- void beginPoolTask();
- void finishPoolTask();
-
-protected:
- Response *response;
-
-public:
- ServerReadReactorBase(Response *response);
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone() override;
-
- // - argument request - data read from the client in the current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> readRequest(Request request) = 0;
-};
-
-template <class Request, class Response>
-ServerReadReactorBase<Request, Response>::ServerReadReactorBase(
- Response *response)
- : response(response) {
- this->statusHolder->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartRead(&this->request);
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->statusHolder->setStatus(status);
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->terminateCallback();
- this->validate();
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state == ReactorState::RUNNING) {
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnDone() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->statusHolder->state = ReactorState::DONE;
- this->doneCallback();
- },
- [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerReadReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::beginPoolTask() {
- this->ongoingPoolTaskCounter++;
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::finishPoolTask() {
- this->ongoingPoolTaskCounter--;
- if (!this->ongoingPoolTaskCounter.load() &&
- this->statusHolder->state == ReactorState::DONE) {
- // This looks weird but apparently it is okay to do this. More
- // information:
- // https://phab.comm.dev/D3246#87890
- delete this;
- }
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
+++ /dev/null
@@ -1,178 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-#include "ThreadPool.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - read a request from the client
-// - write N responses to the client
-// - terminate the connection
-template <class Request, class Response>
-class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
-
- std::atomic<int> ongoingPoolTaskCounter{0};
- Response response;
- bool initialized = false;
-
- void nextWrite();
- void beginPoolTask();
- void finishPoolTask();
-
-protected:
- // this is a const ref since it's not meant to be modified
- const Request &request;
-
-public:
- ServerWriteReactorBase(const Request *request);
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- virtual void initialize(){};
- void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone() override;
-
- // - argument response - should be filled with data that will be sent to the
- // client in the current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> writeResponse(Response *response) = 0;
-};
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->statusHolder->setStatus(status);
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->terminateCallback();
- this->validate();
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state == ReactorState::RUNNING) {
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-ServerWriteReactorBase<Request, Response>::ServerWriteReactorBase(
- const Request *request)
- : request(*request) {
- // we cannot call this->start() here because it's going to call it on
- // the base class, not derived leading to the runtime error of calling
- // a pure virtual function
- // start has to be exposed as a public function and called explicitly
- // to initialize writing
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::nextWrite() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- if (!this->initialized) {
- this->initialize();
- this->initialized = true;
- }
- this->response = Response();
- std::unique_ptr<grpc::Status> status =
- this->writeResponse(&this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::start() {
- this->statusHolder->state = ReactorState::RUNNING;
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnDone() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() { this->doneCallback(); },
- [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerWriteReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error"));
- return;
- }
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::beginPoolTask() {
- this->ongoingPoolTaskCounter++;
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::finishPoolTask() {
- this->ongoingPoolTaskCounter--;
- if (!this->ongoingPoolTaskCounter.load() &&
- this->statusHolder->state == ReactorState::DONE) {
- // This looks weird but apparently it is okay to do this. More
- // information:
- // https://phab.comm.dev/D3246#87890
- delete this;
- }
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 31, 12:40 AM (3 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5871033
Default Alt Text
D8244.1767141641.diff (57 KB)

Event Timeline