Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32526316
D8244.1767141641.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
57 KB
Referenced Files
None
Subscribers
None
D8244.1767141641.diff
View Options
diff --git a/services/lib/cmake-components/folly.cmake b/services/lib/cmake-components/folly.cmake
deleted file mode 100644
--- a/services/lib/cmake-components/folly.cmake
+++ /dev/null
@@ -1,54 +0,0 @@
-add_definitions(
- -DFOLLY_NO_CONFIG=1
- -DFOLLY_HAVE_CLOCK_GETTIME=1
- -DFOLLY_HAVE_MEMRCHR=1
- -DFOLLY_USE_LIBCPP=0
- -DFOLLY_MOBILE=0
-)
-
-set(
- FOLLY_SOURCES
-
- ./lib/folly/folly/detail/Futex.cpp
- ./lib/folly/folly/synchronization/ParkingLot.cpp
- ./lib/folly/folly/lang/SafeAssert.cpp
- ./lib/folly/folly/FileUtil.cpp
- ./lib/folly/folly/Subprocess.cpp
- ./lib/folly/folly/File.cpp
- ./lib/folly/folly/Format.cpp
- ./lib/folly/folly/Conv.cpp
- ./lib/folly/folly/io/IOBuf.cpp
- ./lib/folly/folly/memory/detail/MallocImpl.cpp
- ./lib/folly/folly/ScopeGuard.cpp
- ./lib/folly/folly/hash/SpookyHashV2.cpp
- ./lib/folly/folly/io/IOBufQueue.cpp
- ./lib/folly/folly/lang/Assume.cpp
- ./lib/folly/folly/String.cpp
- ./lib/folly/folly/portability/SysUio.cpp
- ./lib/folly/folly/net/NetOps.cpp
- ./lib/folly/folly/synchronization/Hazptr.cpp
- ./lib/folly/folly/detail/ThreadLocalDetail.cpp
- ./lib/folly/folly/SharedMutex.cpp
- ./lib/folly/folly/concurrency/CacheLocality.cpp
- ./lib/folly/folly/detail/StaticSingletonManager.cpp
- ./lib/folly/folly/executors/ThreadPoolExecutor.cpp
- ./lib/folly/folly/executors/GlobalThreadPoolList.cpp
- ./lib/folly/folly/Demangle.cpp
- ./lib/folly/folly/synchronization/AsymmetricMemoryBarrier.cpp
- ./lib/folly/folly/io/async/Request.cpp
- ./lib/folly/folly/detail/MemoryIdler.cpp
- ./lib/folly/folly/detail/AtFork.cpp
- ./lib/folly/folly/Executor.cpp
- ./lib/folly/folly/lang/CString.cpp
- ./lib/folly/folly/portability/SysMembarrier.cpp
- ./lib/folly/folly/container/detail/F14Table.cpp
- ./lib/folly/folly/detail/UniqueInstance.cpp
- ./lib/folly/folly/executors/QueuedImmediateExecutor.cpp
- ./lib/folly/folly/memory/MallctlHelper.cpp
-)
-
-set(
- FOLLY_INCLUDES
-
- ./lib/folly
-)
diff --git a/services/lib/cmake-components/grpc.cmake b/services/lib/cmake-components/grpc.cmake
deleted file mode 100644
--- a/services/lib/cmake-components/grpc.cmake
+++ /dev/null
@@ -1,25 +0,0 @@
-# Disable naming conventions, as the variable names are determined by upstream
-# cmake-lint: disable=C0103
-
-# protobuf
-set(protobuf_MODULE_COMPATIBLE TRUE)
-find_package(Protobuf CONFIG REQUIRED)
-message(STATUS "Using protobuf ${Protobuf_VERSION}")
-
-set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
-set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
-
-# Find gRPC installation
-find_package(gRPC CONFIG REQUIRED)
-message(STATUS "Using gRPC ${gRPC_VERSION}")
-
-set(_GRPC_GRPCPP gRPC::grpc++)
-set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
-
-set(
- GRPC_LIBS
-
- ${_GRPC_GRPCPP}
- ${_PROTOBUF_LIBPROTOBUF}
- gRPC::grpc++_reflection
-)
diff --git a/services/lib/docker/build_service.sh b/services/lib/docker/build_service.sh
deleted file mode 100755
--- a/services/lib/docker/build_service.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-# Allow scripts to be called from anywhere
-SCRIPT_DIR=$(cd "$(dirname "$0")"; pwd -P)
-
-# folly hack - https://github.com/facebook/folly/pull/1231
-sed -i 's/#if __has_include(<demangle.h>)/#if __has_include(<Demangle.h>)/g' \
- /usr/local/include/folly/detail/Demangle.h
-
-rm -rf cmake/build
-mkdir -p cmake/build
-
-"${SCRIPT_DIR}"/build_sources.sh
diff --git a/services/lib/docker/build_sources.sh b/services/lib/docker/build_sources.sh
deleted file mode 100755
--- a/services/lib/docker/build_sources.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-NPROC=0
-
-NPROC=$(nproc 2> /dev/null || echo 1)
-if [[ $NPROC -eq 1 ]]; then
- NPROC=$(sysctl -n hw.physicalcpu 2> /dev/null || echo 1)
-fi
-
-echo "building the server (nproc=$NPROC)..."
-
-pushd cmake/build
-# gtest is not installed, avoid building test suites
-cmake ../.. -DBUILD_TESTING=OFF
-make -j "$NPROC"
-
-popd
-
-echo "success - server built"
diff --git a/services/lib/docker/install_corrosion.sh b/services/lib/docker/install_corrosion.sh
deleted file mode 100755
--- a/services/lib/docker/install_corrosion.sh
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/usr/bin/env bash
-set -e
-
-cd /tmp
-
-git clone --recurse-submodules -b v0.2.1 --single-branch https://github.com/corrosion-rs/corrosion.git
-pushd corrosion
-cmake -S. -Bbuild -DCMAKE_BUILD_TYPE=Release
-cmake --build build --config Release
-cmake --install build --config Release
-
-popd # corrosion
-rm -rf corrosion
diff --git a/services/lib/docker/proto_codegen.sh b/services/lib/docker/proto_codegen.sh
deleted file mode 100755
--- a/services/lib/docker/proto_codegen.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-echo "generating files from protos..."
-
-for PROTO_FILE in ./protos/*; do
- protoc -I=./protos --cpp_out=_generated --grpc_out=_generated --plugin=protoc-gen-grpc="$(which grpc_cpp_plugin)" ./"$PROTO_FILE"
-done
-
-echo "success - code generated from protos"
diff --git a/services/lib/docker/run_service.sh b/services/lib/docker/run_service.sh
deleted file mode 100755
--- a/services/lib/docker/run_service.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-EXE_PATH="./cmake/build/bin"
-
-EXE=$(find $EXE_PATH -mindepth 1 -maxdepth 1 -not -path '*/.*')
-EXES=$(wc -l <<< "$EXE")
-
-if [[ $EXES -ne 1 ]]; then
- echo "there should be exactly one executable of a service, $EXES found";
- exit 1;
-fi
-
-"$EXE"
diff --git a/services/lib/docker/run_tests.sh b/services/lib/docker/run_tests.sh
deleted file mode 100755
--- a/services/lib/docker/run_tests.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-
-pushd cmake/build
-make test ARGS="-V"
-popd # cmake/build
diff --git a/services/lib/src/BaseReactor.h b/services/lib/src/BaseReactor.h
deleted file mode 100644
--- a/services/lib/src/BaseReactor.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-
-#include "ReactorStatusHolder.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class BaseReactor {
-public:
- // Returns a status holder that consists of:
- // - reactor's state,
- // - status of the operation that's being performed by this reactor.
- virtual std::shared_ptr<ReactorStatusHolder> getStatusHolder() = 0;
- // Should be called when we plan to terminate the connection for some reason
- // either with a success or a failure status.
- // Receives a status indicating a result of the reactor's operation.
- virtual void terminate(const grpc::Status &status) = 0;
- // Validates current values of the reactor's fields.
- virtual void validate() = 0;
- // Should be called when `OnDone` is called. gRPC calls `OnDone` when there
- // are not going to be more rpc operations.
- virtual void doneCallback() = 0;
- // Should be called when `terminate` is called.
- virtual void terminateCallback() = 0;
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/CMakeLists.txt b/services/lib/src/CMakeLists.txt
deleted file mode 100644
--- a/services/lib/src/CMakeLists.txt
+++ /dev/null
@@ -1,62 +0,0 @@
-project(comm-services-common)
-cmake_minimum_required(VERSION 3.4)
-set(CMAKE_CXX_STANDARD 14)
-
-include(GNUInstallDirs)
-
-# Export reactors
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/client-base-reactors)
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/server-base-reactors)
-
-file(GLOB COMMON_HDRS
- ${CMAKE_CURRENT_SOURCE_DIR}/*.h
-)
-
-file(GLOB COMMON_SRCS
- ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
-)
-
-add_library(comm-services-common
- ${COMMON_HDRS}
- ${COMMON_SRCS}
-)
-
-find_package(AWSSDK REQUIRED COMPONENTS core dynamodb)
-find_package(Boost 1.40 COMPONENTS program_options REQUIRED)
-find_package(Protobuf REQUIRED)
-find_package(glog REQUIRED)
-find_package(gRPC REQUIRED)
-
-target_link_libraries(comm-services-common
- glog::glog
- gRPC::grpc++
- ${AWSSDK_LINK_LIBRARIES}
- ${Boost_LIBRARIES}
-)
-
-target_include_directories(comm-services-common
- PUBLIC
- $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
- $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
-)
-
-install(TARGETS comm-services-common EXPORT comm-services-common-export
- RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT comm-services-common
- LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT comm-services-common
- ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT comm-services-common
-)
-
-install(FILES ${COMMON_HDRS} DESTINATION include)
-
-set(_pname ${PROJECT_NAME})
-export(TARGETS comm-services-common
- NAMESPACE comm-services-common::
- FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake
-)
-
-# For installation
-install(EXPORT comm-services-common-export
- FILE comm-services-common-targets.cmake
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-services-common
- NAMESPACE comm-services-common::
-)
diff --git a/services/lib/src/DatabaseEntitiesTools.h b/services/lib/src/DatabaseEntitiesTools.h
deleted file mode 100644
--- a/services/lib/src/DatabaseEntitiesTools.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-
-#include "Item.h"
-
-#include <memory>
-#include <type_traits>
-
-namespace comm {
-namespace network {
-namespace database {
-
-template <typename T> std::shared_ptr<T> createItemByType() {
- static_assert(std::is_base_of<Item, T>::value, "T must inherit from Item");
- return std::make_shared<T>();
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DatabaseManagerBase.h b/services/lib/src/DatabaseManagerBase.h
deleted file mode 100644
--- a/services/lib/src/DatabaseManagerBase.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#pragma once
-
-#include "DatabaseEntitiesTools.h"
-#include "DynamoDBTools.h"
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/model/GetItemRequest.h>
-#include <aws/dynamodb/model/PutItemRequest.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-namespace database {
-
-// this class should be thread-safe in case any shared resources appear
-class DatabaseManagerBase {
-protected:
- void innerPutItem(
- std::shared_ptr<Item> item,
- const Aws::DynamoDB::Model::PutItemRequest &request);
-
- template <typename T>
- std::shared_ptr<T>
- innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request);
-
- void innerRemoveItem(const Item &item);
- void innerBatchWriteItem(
- const std::string &tableName,
- const size_t &chunkSize,
- const size_t &backoffFirstRetryDelay,
- const size_t &maxBackoffTime,
- std::vector<Aws::DynamoDB::Model::WriteRequest> &writeRequests);
-};
-
-template <typename T>
-std::shared_ptr<T> DatabaseManagerBase::innerFindItem(
- Aws::DynamoDB::Model::GetItemRequest &request) {
- std::shared_ptr<T> item = createItemByType<T>();
- request.SetTableName(item->getTableName());
- const Aws::DynamoDB::Model::GetItemOutcome &outcome =
- getDynamoDBClient()->GetItem(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- const AttributeValues &outcomeItem = outcome.GetResult().GetItem();
- if (!outcomeItem.size()) {
- return nullptr;
- }
- item->assignItemFromDatabase(outcomeItem);
- return item;
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp
deleted file mode 100644
--- a/services/lib/src/DatabaseManagerBase.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-#include "DatabaseManagerBase.h"
-
-#include "Item.h"
-
-#include <aws/core/utils/Outcome.h>
-#include <aws/dynamodb/model/BatchWriteItemRequest.h>
-#include <aws/dynamodb/model/BatchWriteItemResult.h>
-#include <aws/dynamodb/model/DeleteItemRequest.h>
-#include <glog/logging.h>
-
-#include <chrono>
-#include <cmath>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace database {
-
-void DatabaseManagerBase::innerPutItem(
- std::shared_ptr<Item> item,
- const Aws::DynamoDB::Model::PutItemRequest &request) {
- const Aws::DynamoDB::Model::PutItemOutcome outcome =
- getDynamoDBClient()->PutItem(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-}
-
-void DatabaseManagerBase::innerRemoveItem(const Item &item) {
- Aws::DynamoDB::Model::DeleteItemRequest request;
- request.SetTableName(item.getTableName());
- PrimaryKeyDescriptor pk = item.getPrimaryKeyDescriptor();
- PrimaryKeyValue primaryKeyValue = item.getPrimaryKeyValue();
- request.AddKey(
- pk.partitionKey,
- Aws::DynamoDB::Model::AttributeValue(primaryKeyValue.partitionKey));
- if (pk.sortKey != nullptr && primaryKeyValue.sortKey != nullptr) {
- request.AddKey(
- *pk.sortKey,
- Aws::DynamoDB::Model::AttributeValue(*primaryKeyValue.sortKey));
- }
-
- const Aws::DynamoDB::Model::DeleteItemOutcome &outcome =
- getDynamoDBClient()->DeleteItem(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-}
-
-void DatabaseManagerBase::innerBatchWriteItem(
- const std::string &tableName,
- const size_t &chunkSize,
- const size_t &backoffFirstRetryDelay,
- const size_t &maxBackoffTime,
- std::vector<Aws::DynamoDB::Model::WriteRequest> &writeRequests) {
- // Split write requests to chunks by chunkSize size and write
- // them by batch
- Aws::DynamoDB::Model::BatchWriteItemOutcome outcome;
- std::vector<Aws::DynamoDB::Model::WriteRequest> writeRequestsChunk;
- std::vector<Aws::DynamoDB::Model::WriteRequest>::iterator chunkPositionStart,
- chunkPositionEnd;
- for (size_t i = 0; i < writeRequests.size(); i += chunkSize) {
- chunkPositionStart = writeRequests.begin() + i;
- chunkPositionEnd =
- writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize);
- writeRequestsChunk = std::vector<Aws::DynamoDB::Model::WriteRequest>(
- chunkPositionStart, chunkPositionEnd);
-
- Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest;
- writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk);
- outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-
- size_t delayRetry, delayMs, jitterMs;
- while (!outcome.GetResult().GetUnprocessedItems().empty()) {
- if (delayMs == maxBackoffTime) {
- throw std::runtime_error(
- "InnerBatchWriteItem error: maximum wait time to put unprocessed "
- "items to DynamoDB is exceeded.");
- }
- jitterMs = std::rand() % 99 + 1;
- delayRetry++;
- delayMs = std::min(
- size_t(backoffFirstRetryDelay * std::pow(2, delayRetry) + jitterMs),
- maxBackoffTime);
- LOG(INFO) << "Waiting for a backoff " << delayMs
- << "ms delay before putting unprocessed items from batch write "
- "to DynamoDB";
- std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
- writeBatchRequest.SetRequestItems(
- outcome.GetResult().GetUnprocessedItems());
- outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- }
- }
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DynamoDBTools.h b/services/lib/src/DynamoDBTools.h
deleted file mode 100644
--- a/services/lib/src/DynamoDBTools.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/DynamoDBClient.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-
-std::unique_ptr<Aws::DynamoDB::DynamoDBClient> getDynamoDBClient();
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/DynamoDBTools.cpp b/services/lib/src/DynamoDBTools.cpp
deleted file mode 100644
--- a/services/lib/src/DynamoDBTools.cpp
+++ /dev/null
@@ -1,19 +0,0 @@
-#include "DynamoDBTools.h"
-#include "GlobalConstants.h"
-#include "GlobalTools.h"
-
-namespace comm {
-namespace network {
-
-std::unique_ptr<Aws::DynamoDB::DynamoDBClient> getDynamoDBClient() {
- Aws::Client::ClientConfiguration config;
- config.region = AWS_REGION;
- if (tools::isSandbox()) {
- config.endpointOverride = Aws::String("localstack:4566");
- config.scheme = Aws::Http::Scheme::HTTP;
- }
- return std::make_unique<Aws::DynamoDB::DynamoDBClient>(config);
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/GlobalConstants.h b/services/lib/src/GlobalConstants.h
deleted file mode 100644
--- a/services/lib/src/GlobalConstants.h
+++ /dev/null
@@ -1,31 +0,0 @@
-#pragma once
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-// 4MB limit
-// WARNING: use keeping in mind that grpc adds its own headers to messages
-// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
-// so the message that actually is being sent over the network looks like this
-// [Compressed-Flag] [Message-Length] [Message]
-// [Compressed-Flag] 1 byte - added by grpc
-// [Message-Length] 4 bytes - added by grpc
-// [Message] N bytes - actual data
-// so for every message we get 5 additional bytes of data
-// as mentioned here
-// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671
-// grpc stream may contain more than one message
-const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024;
-const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5;
-
-const std::string AWS_REGION = "us-east-2";
-
-const char ATTACHMENT_DELIMITER = ';';
-
-// gRPC Server
-const std::string SERVER_LISTEN_ADDRESS = "0.0.0.0:50051";
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/GlobalTools.h b/services/lib/src/GlobalTools.h
deleted file mode 100644
--- a/services/lib/src/GlobalTools.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-
-#include <cstdint>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace tools {
-
-const std::string ID_SEPARATOR = ":";
-
-uint64_t getCurrentTimestamp();
-
-bool hasEnvFlag(const std::string &flag);
-
-size_t getNumberOfCores();
-
-std::string decorateTableName(const std::string &baseName);
-
-bool isSandbox();
-
-std::string generateUUID();
-
-bool validateUUIDv4(const std::string &uuid);
-
-void InitLogging(const std::string &programName);
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/GlobalTools.cpp b/services/lib/src/GlobalTools.cpp
deleted file mode 100644
--- a/services/lib/src/GlobalTools.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-#include "GlobalTools.h"
-
-#include <glog/logging.h>
-#include <openssl/sha.h>
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
-
-#include <algorithm>
-#include <chrono>
-#include <iomanip>
-#include <regex>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace tools {
-
-uint64_t getCurrentTimestamp() {
- using namespace std::chrono;
- return duration_cast<milliseconds>(system_clock::now().time_since_epoch())
- .count();
-}
-
-bool hasEnvFlag(const std::string &flag) {
- if (std::getenv(flag.c_str()) == nullptr) {
- return false;
- }
- return std::string(std::getenv(flag.c_str())) == "1";
-}
-
-size_t getNumberOfCores() {
- return (size_t)std::max(1u, std::thread::hardware_concurrency());
-}
-
-std::string decorateTableName(const std::string &baseName) {
- std::string suffix = "";
- if (hasEnvFlag("COMM_TEST_SERVICES")) {
- suffix = "-test";
- }
- return baseName + suffix;
-}
-
-bool isSandbox() {
- return hasEnvFlag("COMM_SERVICES_SANDBOX");
-}
-
-std::string generateUUID() {
- thread_local boost::uuids::random_generator random_generator;
- return boost::uuids::to_string(random_generator());
-}
-
-bool validateUUIDv4(const std::string &uuid) {
- const std::regex uuidV4RegexFormat(
- "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$");
- try {
- return std::regex_match(uuid, uuidV4RegexFormat);
- } catch (const std::exception &e) {
- LOG(ERROR) << "Tools: "
- << "Got an exception at `validateUUID`: " << e.what();
- return false;
- }
-}
-
-void InitLogging(const std::string &programName) {
- FLAGS_logtostderr = true;
- FLAGS_colorlogtostderr = true;
- if (comm::network::tools::isSandbox()) {
- // Log levels INFO, WARNING, ERROR, FATAL are 0, 1, 2, 3, respectively
- FLAGS_minloglevel = 0;
- } else {
- FLAGS_minloglevel = 1;
- }
- google::InitGoogleLogging(programName.c_str());
-}
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/Item.h b/services/lib/src/Item.h
deleted file mode 100644
--- a/services/lib/src/Item.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/DynamoDBClient.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-typedef Aws::Map<Aws::String, Aws::DynamoDB::Model::AttributeValue>
- AttributeValues;
-
-struct PrimaryKeyBase {
- PrimaryKeyBase(const std::string partitionKey)
- : partitionKey(partitionKey), sortKey(nullptr) {
- }
- PrimaryKeyBase(const std::string partitionKey, const std::string sortKey)
- : partitionKey(partitionKey),
- sortKey(std::make_unique<std::string>(sortKey)) {
- }
-
- const std::string partitionKey;
- std::unique_ptr<std::string> sortKey;
-};
-
-struct PrimaryKeyDescriptor : PrimaryKeyBase {
- using PrimaryKeyBase::PrimaryKeyBase;
-};
-
-struct PrimaryKeyValue : PrimaryKeyBase {
- using PrimaryKeyBase::PrimaryKeyBase;
-};
-
-class Item {
- virtual void validate() const = 0;
-
-public:
- virtual std::string getTableName() const = 0;
- virtual PrimaryKeyDescriptor getPrimaryKeyDescriptor() const = 0;
- virtual PrimaryKeyValue getPrimaryKeyValue() const = 0;
- virtual void assignItemFromDatabase(const AttributeValues &itemFromDB) = 0;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/ReactorStatusHolder.h b/services/lib/src/ReactorStatusHolder.h
deleted file mode 100644
--- a/services/lib/src/ReactorStatusHolder.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#pragma once
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <mutex>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-enum class ReactorState {
- NONE = 0,
- RUNNING = 1,
- TERMINATED = 2,
- DONE = 3,
-};
-
-class ReactorStatusHolder {
-private:
- grpc::Status status = grpc::Status::OK;
- std::mutex statusAccessMutex;
-
-public:
- std::atomic<ReactorState> state = {ReactorState::NONE};
-
- grpc::Status getStatus() {
- const std::unique_lock<std::mutex> lock(this->statusAccessMutex);
- return this->status;
- }
- void setStatus(const grpc::Status &status) {
- const std::unique_lock<std::mutex> lock(this->statusAccessMutex);
- this->status = status;
- }
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/ThreadPool.h b/services/lib/src/ThreadPool.h
deleted file mode 100644
--- a/services/lib/src/ThreadPool.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include "GlobalTools.h"
-
-#include <boost/asio.hpp>
-#include <boost/asio/thread_pool.hpp>
-
-#include <atomic>
-#include <memory>
-
-typedef std::function<void()> Task;
-typedef std::function<void(std::unique_ptr<std::string>)> Callback;
-
-namespace comm {
-namespace network {
-
-class ThreadPool {
- boost::asio::thread_pool pool =
- boost::asio::thread_pool(tools::getNumberOfCores());
-
- ThreadPool() {
- }
-
- virtual ~ThreadPool() {
- }
-
-public:
- static ThreadPool &getInstance() {
- static ThreadPool instance;
- return instance;
- }
-
- void scheduleWithCallback(Task task, Callback callback) {
- boost::asio::post(this->pool, [task, callback]() {
- std::unique_ptr<std::string> err = nullptr;
- try {
- task();
- } catch (std::exception &e) {
- err = std::make_unique<std::string>(e.what());
- }
- callback(std::move(err));
- });
- }
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/client-base-reactors/CMakeLists.txt b/services/lib/src/client-base-reactors/CMakeLists.txt
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/CMakeLists.txt
+++ /dev/null
@@ -1,44 +0,0 @@
-project(comm-client-base-reactors CXX C)
-cmake_minimum_required(VERSION 3.10)
-
-include(GNUInstallDirs)
-
-set(CLIENT_HDRS
- ClientWriteReactorBase.h
- ClientBidiReactorBase.h
- ClientReadReactorBase.h
-)
-
-add_library(comm-client-base-reactors
- INTERFACE
-)
-
-target_include_directories(comm-client-base-reactors
- INTERFACE
- $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
- $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
-)
-
-install(TARGETS comm-client-base-reactors
- EXPORT comm-client-base-reactors-export
- RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
- COMPONENT comm-client-base-reactors
- LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-client-base-reactors
- ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-client-base-reactors
-)
-
-install(FILES ${COMMON_HDRS} DESTINATION include)
-
-set(_pname ${PROJECT_NAME})
-export(TARGETS comm-client-base-reactors
- NAMESPACE comm-client-base-reactors::
- FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake
-)
-
-install(EXPORT comm-client-base-reactors-export
- FILE comm-client-base-reactors-targets.cmake
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-client-base-reactors
- NAMESPACE comm-client-base-reactors::
-)
diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h
+++ /dev/null
@@ -1,144 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - repeat:
-// - write a request to the server
-// - read a response from the server
-// - terminate the connection
-template <class Request, class Response>
-class ClientBidiReactorBase : public grpc::ClientBidiReactor<Request, Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
- std::shared_ptr<Response> response = nullptr;
- void nextWrite();
-
-protected:
- Request request;
-
-public:
- grpc::ClientContext context;
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnWriteDone(bool ok) override;
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone(const grpc::Status &status) override;
-
- // - argument request - request that's about to be prepared for the next cycle
- // - argument previousResponse - response received during the previous cycle
- // (may be nullptr)
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> prepareRequest(
- Request &request,
- std::shared_ptr<Response> previousResponse) = 0;
-};
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::nextWrite() {
- this->request = Request();
- try {
- std::unique_ptr<grpc::Status> status =
- this->prepareRequest(this->request, this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- return;
- }
- this->StartWrite(&this->request);
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::start() {
- if (this->statusHolder->state != ReactorState::NONE) {
- return;
- }
- this->statusHolder->state = ReactorState::RUNNING;
- this->nextWrite();
- this->StartCall();
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (this->response == nullptr) {
- this->response = std::make_shared<Response>();
- }
- this->StartRead(&(*this->response));
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->statusHolder->getStatus().ok()) {
- this->statusHolder->setStatus(status);
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->StartWritesDone();
- this->statusHolder->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-void ClientBidiReactorBase<Request, Response>::OnDone(
- const grpc::Status &status) {
- this->statusHolder->state = ReactorState::DONE;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ClientBidiReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h
+++ /dev/null
@@ -1,121 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - send a request to the server
-// - read N responses from the server
-// - terminate the connection
-template <class Request, class Response>
-class ClientReadReactorBase : public grpc::ClientReadReactor<Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
- Response response;
-
-public:
- Request request;
- grpc::ClientContext context;
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone(const grpc::Status &status) override;
-
- // - argument response - response from the server that was read during the
- // current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> readResponse(Response &response) = 0;
-};
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::start() {
- if (this->statusHolder->state != ReactorState::NONE) {
- return;
- }
- this->StartRead(&this->response);
- if (this->statusHolder->state != ReactorState::RUNNING) {
- this->StartCall();
- this->statusHolder->state = ReactorState::RUNNING;
- }
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- try {
- std::unique_ptr<grpc::Status> status = this->readResponse(this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->StartRead(&this->response);
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->statusHolder->getStatus().ok()) {
- this->statusHolder->setStatus(status);
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->statusHolder->state = ReactorState::TERMINATED;
-}
-
-template <class Request, class Response>
-void ClientReadReactorBase<Request, Response>::OnDone(
- const grpc::Status &status) {
- this->statusHolder->state = ReactorState::DONE;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ClientReadReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h
deleted file mode 100644
--- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h
+++ /dev/null
@@ -1,129 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - write N requests to the server
-// - terminate the connection
-template <class Request, class Response>
-class ClientWriteReactorBase : public grpc::ClientWriteReactor<Request>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
- Request request;
- bool initialized = false;
-
- void nextWrite();
-
-public:
- Response response;
- grpc::ClientContext context;
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone(const grpc::Status &status) override;
-
- // - argument request - request that should be edited and is going to be sent
- // in the current cycle to the server
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> prepareRequest(Request &request) = 0;
-};
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::nextWrite() {
- this->request = Request();
- try {
- std::unique_ptr<grpc::Status> status = this->prepareRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::runtime_error &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->StartWrite(&this->request);
- if (!this->initialized) {
- this->StartCall();
- this->initialized = true;
- }
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::start() {
- throw std::runtime_error("this class has not been tested");
- if (this->statusHolder->state != ReactorState::NONE) {
- return;
- }
- this->statusHolder->state = ReactorState::RUNNING;
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error"));
- return;
- }
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- if (this->statusHolder->getStatus().ok()) {
- this->statusHolder->setStatus(status);
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->terminateCallback();
- try {
- this->validate();
- } catch (std::runtime_error &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- this->statusHolder->state = ReactorState::TERMINATED;
- this->StartWritesDone();
-}
-
-template <class Request, class Response>
-void ClientWriteReactorBase<Request, Response>::OnDone(
- const grpc::Status &status) {
- this->statusHolder->state = ReactorState::DONE;
- this->terminate(status);
- this->doneCallback();
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ClientWriteReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/server-base-reactors/CMakeLists.txt b/services/lib/src/server-base-reactors/CMakeLists.txt
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/CMakeLists.txt
+++ /dev/null
@@ -1,42 +0,0 @@
-project(comm-server-base-reactors CXX C)
-cmake_minimum_required(VERSION 3.10)
-
-include(GNUInstallDirs)
-
-file(GLOB SERVER_HDRS
- ${CMAKE_CURRENT_SOURCE_DIR}/*.h
-)
-
-add_library(comm-server-base-reactors
- INTERFACE # There's nothing to "build" with headers, so just export them
-)
-
-target_include_directories(comm-server-base-reactors
- INTERFACE
- $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
- $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
-)
-
-install(TARGETS comm-server-base-reactors
- EXPORT comm-server-base-reactors-export
- RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
- COMPONENT comm-server-base-reactors
- LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-server-base-reactors
- ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
- COMPONENT comm-server-base-reactors
-)
-
-install(FILES ${COMMON_HDRS} DESTINATION include)
-
-set(_pname ${PROJECT_NAME})
-export(TARGETS comm-server-base-reactors
- NAMESPACE comm-server-base-reactors::
- FILE ${CMAKE_CURRENT_BINARY_DIR}/cmake/${_pname}/${_pname}-targets.cmake
-)
-
-install(EXPORT comm-server-base-reactors-export
- FILE comm-server-base-reactors-targets.cmake
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/comm-server-base-reactors
- NAMESPACE comm-server-base-reactors::
-)
diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
+++ /dev/null
@@ -1,212 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-#include "ThreadPool.h"
-
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-struct ServerBidiReactorStatus {
- grpc::Status status;
- bool sendLastResponse;
- ServerBidiReactorStatus(
- grpc::Status status = grpc::Status::OK,
- bool sendLastResponse = false)
- : status(status), sendLastResponse(sendLastResponse) {
- }
-};
-
-// This is how this type of reactor works:
-// - repeat:
-// - read a request from the client
-// - write a response to the client
-// - terminate the connection
-template <class Request, class Response>
-class ServerBidiReactorBase : public grpc::ServerBidiReactor<Request, Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
-
- std::atomic<int> ongoingPoolTaskCounter{0};
-
- Request request;
- Response response;
-
- void beginPoolTask();
- void finishPoolTask();
-
-protected:
- ServerBidiReactorStatus status;
- bool readingAborted = false;
-
-public:
- ServerBidiReactorBase();
-
- // these methods come from the BaseReactor(go there for more information)
- void terminate(const grpc::Status &status) override;
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnDone() override;
- void OnReadDone(bool ok) override;
- void OnWriteDone(bool ok) override;
-
- void terminate(ServerBidiReactorStatus status);
- ServerBidiReactorStatus getStatus() const;
- void setStatus(const ServerBidiReactorStatus &status);
-
- // - argument request - request that was sent by the client and received by
- // the server in the current cycle
- // - argument response - response that will be sent to the client in the
- // current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<ServerBidiReactorStatus>
- handleRequest(Request request, Response *response) = 0;
-};
-
-template <class Request, class Response>
-ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() {
- this->statusHolder->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->terminate(ServerBidiReactorStatus(status));
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnDone() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->statusHolder->state = ReactorState::DONE;
- this->doneCallback();
- },
- [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::terminate(
- ServerBidiReactorStatus status) {
- this->setStatus(status);
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->terminateCallback();
- this->validate();
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->setStatus(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err))));
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- this->finishPoolTask();
- return;
- }
- if (this->getStatus().sendLastResponse) {
- this->StartWriteAndFinish(
- &this->response, grpc::WriteOptions(), this->getStatus().status);
- } else {
- this->Finish(this->getStatus().status);
- }
- this->statusHolder->state = ReactorState::TERMINATED;
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-ServerBidiReactorStatus
-ServerBidiReactorBase<Request, Response>::getStatus() const {
- return this->status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::setStatus(
- const ServerBidiReactorStatus &status) {
- this->status = status;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- this->readingAborted = true;
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(ServerBidiReactorStatus(grpc::Status::OK));
- return;
- }
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->response = Response();
- std::unique_ptr<ServerBidiReactorStatus> status =
- this->handleRequest(this->request, &this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, *err)));
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::ABORTED, "write failed")));
- return;
- }
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerBidiReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::beginPoolTask() {
- this->ongoingPoolTaskCounter++;
-}
-
-template <class Request, class Response>
-void ServerBidiReactorBase<Request, Response>::finishPoolTask() {
- this->ongoingPoolTaskCounter--;
- if (!this->ongoingPoolTaskCounter.load() &&
- this->statusHolder->state == ReactorState::DONE) {
- // This looks weird but apparently it is okay to do this. More
- // information:
- // https://phab.comm.dev/D3246#87890
- delete this;
- }
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h
+++ /dev/null
@@ -1,156 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-#include "ThreadPool.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - read N requests from the client
-// - write a final response to the client (may be empty)
-// - terminate the connection
-template <class Request, class Response>
-class ServerReadReactorBase : public grpc::ServerReadReactor<Request>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
-
- std::atomic<int> ongoingPoolTaskCounter{0};
- Request request;
-
- void beginPoolTask();
- void finishPoolTask();
-
-protected:
- Response *response;
-
-public:
- ServerReadReactorBase(Response *response);
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- void OnReadDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone() override;
-
- // - argument request - data read from the client in the current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> readRequest(Request request) = 0;
-};
-
-template <class Request, class Response>
-ServerReadReactorBase<Request, Response>::ServerReadReactorBase(
- Response *response)
- : response(response) {
- this->statusHolder->state = ReactorState::RUNNING;
- this->StartRead(&this->request);
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnReadDone(bool ok) {
- if (!ok) {
- // Ending a connection on the other side results in the `ok` flag being set
- // to false. It makes it impossible to detect a failure based just on the
- // flag. We should manually check if the data we received is valid
- this->terminate(grpc::Status::OK);
- return;
- }
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartRead(&this->request);
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->statusHolder->setStatus(status);
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->terminateCallback();
- this->validate();
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state == ReactorState::RUNNING) {
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnDone() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->statusHolder->state = ReactorState::DONE;
- this->doneCallback();
- },
- [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerReadReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::beginPoolTask() {
- this->ongoingPoolTaskCounter++;
-}
-
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::finishPoolTask() {
- this->ongoingPoolTaskCounter--;
- if (!this->ongoingPoolTaskCounter.load() &&
- this->statusHolder->state == ReactorState::DONE) {
- // This looks weird but apparently it is okay to do this. More
- // information:
- // https://phab.comm.dev/D3246#87890
- delete this;
- }
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
deleted file mode 100644
--- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
+++ /dev/null
@@ -1,178 +0,0 @@
-#pragma once
-
-#include "BaseReactor.h"
-#include "ThreadPool.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <atomic>
-#include <memory>
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-// This is how this type of reactor works:
-// - read a request from the client
-// - write N responses to the client
-// - terminate the connection
-template <class Request, class Response>
-class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>,
- public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder =
- std::make_shared<ReactorStatusHolder>();
-
- std::atomic<int> ongoingPoolTaskCounter{0};
- Response response;
- bool initialized = false;
-
- void nextWrite();
- void beginPoolTask();
- void finishPoolTask();
-
-protected:
- // this is a const ref since it's not meant to be modified
- const Request &request;
-
-public:
- ServerWriteReactorBase(const Request *request);
-
- // this should be called explicitly right after the reactor is created
- void start();
-
- // these methods come from the BaseReactor(go there for more information)
- void validate() override{};
- void doneCallback() override{};
- void terminateCallback() override{};
- std::shared_ptr<ReactorStatusHolder> getStatusHolder() override;
-
- // these methods come from gRPC
- // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237
- virtual void initialize(){};
- void OnWriteDone(bool ok) override;
- void terminate(const grpc::Status &status) override;
- void OnDone() override;
-
- // - argument response - should be filled with data that will be sent to the
- // client in the current cycle
- // - returns status - if the connection is about to be
- // continued, nullptr should be returned. Any other returned value will
- // terminate the connection with a given status
- virtual std::unique_ptr<grpc::Status> writeResponse(Response *response) = 0;
-};
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::terminate(
- const grpc::Status &status) {
- this->statusHolder->setStatus(status);
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- this->terminateCallback();
- this->validate();
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state == ReactorState::RUNNING) {
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-ServerWriteReactorBase<Request, Response>::ServerWriteReactorBase(
- const Request *request)
- : request(*request) {
- // we cannot call this->start() here because it's going to call it on
- // the base class, not derived leading to the runtime error of calling
- // a pure virtual function
- // start has to be exposed as a public function and called explicitly
- // to initialize writing
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::nextWrite() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() {
- if (!this->initialized) {
- this->initialize();
- this->initialized = true;
- }
- this->response = Response();
- std::unique_ptr<grpc::Status> status =
- this->writeResponse(&this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- },
- [this](std::unique_ptr<std::string> err) {
- if (err != nullptr) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
- }
- this->finishPoolTask();
- });
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::start() {
- this->statusHolder->state = ReactorState::RUNNING;
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnDone() {
- this->beginPoolTask();
- ThreadPool::getInstance().scheduleWithCallback(
- [this]() { this->doneCallback(); },
- [this](std::unique_ptr<std::string> err) { this->finishPoolTask(); });
-}
-
-template <class Request, class Response>
-std::shared_ptr<ReactorStatusHolder>
-ServerWriteReactorBase<Request, Response>::getStatusHolder() {
- return this->statusHolder;
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
- if (!ok) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error"));
- return;
- }
- this->nextWrite();
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::beginPoolTask() {
- this->ongoingPoolTaskCounter++;
-}
-
-template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::finishPoolTask() {
- this->ongoingPoolTaskCounter--;
- if (!this->ongoingPoolTaskCounter.load() &&
- this->statusHolder->state == ReactorState::DONE) {
- // This looks weird but apparently it is okay to do this. More
- // information:
- // https://phab.comm.dev/D3246#87890
- delete this;
- }
-}
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 31, 12:40 AM (3 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5871033
Default Alt Text
D8244.1767141641.diff (57 KB)
Attached To
Mode
D8244: [services] Delete shared C++ lib
Attached
Detach File
Event Timeline
Log In to Comment