diff --git a/scripts/get_clang_paths.js b/scripts/get_clang_paths.js index 3391d74e2..1dc7dd582 100644 --- a/scripts/get_clang_paths.js +++ b/scripts/get_clang_paths.js @@ -1,60 +1,64 @@ // @flow const clangPaths = [ { path: 'native/cpp/CommonCpp', extensions: ['h', 'cpp'], excludes: ['_generated'], }, + { + path: 'services/lib/src', + extensions: ['cpp', 'h'], + }, { path: 'services/tunnelbroker/src', extensions: ['cpp', 'h'], }, { path: 'services/tunnelbroker/test', extensions: ['cpp', 'h'], }, { path: 'services/backup/src', extensions: ['cpp', 'h'], }, { path: 'services/backup/test', extensions: ['cpp', 'h'], }, { path: 'services/blob/src', extensions: ['cpp', 'h'], }, { path: 'services/blob/test', extensions: ['cpp', 'h'], }, { path: 'native/android/app/src/cpp', extensions: ['cpp', 'h'], }, { path: 'native/ios/Comm', extensions: ['h', 'm', 'mm'], }, { path: 'native/ios/CommTests', extensions: ['mm'], }, { path: 'native/ios/NotificationService', extensions: ['h', 'm', 'mm'], }, { path: 'native/android/app/src/main/java/app/comm', extensions: ['java'], excludes: ['generated'], }, ]; function getClangPaths() { return clangPaths.map(pathItem => pathItem.path); } module.exports = { getClangPaths, clangPaths }; diff --git a/services/lib/src/BaseReactor.h b/services/lib/src/BaseReactor.h index bb5442744..22826a40d 100644 --- a/services/lib/src/BaseReactor.h +++ b/services/lib/src/BaseReactor.h @@ -1,34 +1,34 @@ #pragma once #include "ReactorStatusHolder.h" #include #include namespace comm { namespace network { namespace reactor { class BaseReactor { public: // Returns a status holder that consists of: // - reactor's state, // - status of the operation that's being performed by this reactor. virtual std::shared_ptr getStatusHolder() = 0; // Should be called when we plan to terminate the connection for some reason // either with a success or a failure status. // Receives a status indicating a result of the reactor's operation. virtual void terminate(const grpc::Status &status) = 0; // Validates current values of the reactor's fields. virtual void validate() = 0; - // Should be called when `OnDone` is called. gRPC calls `OnDone` when there + // Should be called when `OnDone` is called. gRPC calls `OnDone` when there // are not going to be more rpc operations. virtual void doneCallback() = 0; // Should be called when `terminate` is called. virtual void terminateCallback() = 0; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/GlobalTools.cpp b/services/lib/src/GlobalTools.cpp index e91542f95..54585e7e0 100644 --- a/services/lib/src/GlobalTools.cpp +++ b/services/lib/src/GlobalTools.cpp @@ -1,80 +1,80 @@ #include "GlobalTools.h" #include #include #include #include #include +#include #include #include #include #include #include -#include namespace comm { namespace network { namespace tools { uint64_t getCurrentTimestamp() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()) .count(); } bool hasEnvFlag(const std::string &flag) { if (std::getenv(flag.c_str()) == nullptr) { return false; } return std::string(std::getenv(flag.c_str())) == "1"; } size_t getNumberOfCores() { return (size_t)std::max(1u, std::thread::hardware_concurrency()); } std::string decorateTableName(const std::string &baseName) { std::string suffix = ""; if (hasEnvFlag("COMM_TEST_SERVICES")) { suffix = "-test"; } return baseName + suffix; } bool isSandbox() { return hasEnvFlag("COMM_SERVICES_SANDBOX"); } std::string generateUUID() { thread_local boost::uuids::random_generator random_generator; return boost::uuids::to_string(random_generator()); } bool validateUUIDv4(const std::string &uuid) { const std::regex uuidV4RegexFormat( "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"); try { return std::regex_match(uuid, uuidV4RegexFormat); } catch (const std::exception &e) { LOG(ERROR) << "Tools: " << "Got an exception at `validateUUID`: " << e.what(); return false; } } void InitLogging(const std::string &programName) { FLAGS_logtostderr = true; FLAGS_colorlogtostderr = true; if (comm::network::tools::isSandbox()) { // Log levels INFO, WARNING, ERROR, FATAL are 0, 1, 2, 3, respectively FLAGS_minloglevel = 0; } else { FLAGS_minloglevel = 1; } google::InitGoogleLogging(programName.c_str()); } } // namespace tools } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h index f6a52c7d0..5ef47297b 100644 --- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h @@ -1,144 +1,144 @@ #pragma once #include "BaseReactor.h" -#include #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - repeat: // - write a request to the server // - read a response from the server // - terminate the connection template class ClientBidiReactorBase : public grpc::ClientBidiReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); std::shared_ptr response = nullptr; void nextWrite(); protected: Request request; public: grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument request - request that's about to be prepared for the next cycle // - argument previousResponse - response received during the previous cycle // (may be nullptr) // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); } template void ClientBidiReactorBase::start() { if (this->statusHolder->state != ReactorState::NONE) { return; } this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); this->StartCall(); } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } template void ClientBidiReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWritesDone(); this->statusHolder->state = ReactorState::TERMINATED; } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientBidiReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h index bf0eeede3..b987f48b7 100644 --- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientReadReactorBase.h @@ -1,121 +1,121 @@ #pragma once #include "BaseReactor.h" -#include #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - send a request to the server // - read N responses from the server // - terminate the connection template class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Response response; public: Request request; grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument response - response from the server that was read during the // current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr readResponse(Response &response) = 0; }; template void ClientReadReactorBase::start() { if (this->statusHolder->state != ReactorState::NONE) { return; } this->StartRead(&this->response); if (this->statusHolder->state != ReactorState::RUNNING) { this->StartCall(); this->statusHolder->state = ReactorState::RUNNING; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartRead(&this->response); } template void ClientReadReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->statusHolder->state = ReactorState::TERMINATED; } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientReadReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h index 5e6317b42..8d10d4a07 100644 --- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h @@ -1,129 +1,129 @@ #pragma once #include "BaseReactor.h" -#include #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - write N requests to the server // - terminate the connection template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; bool initialized = false; void nextWrite(); public: Response response; grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument request - request that should be edited and is going to be sent // in the current cycle to the server // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr prepareRequest(Request &request) = 0; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::start() { throw std::runtime_error("this class has not been tested"); if (this->statusHolder->state != ReactorState::NONE) { return; } this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->statusHolder->state = ReactorState::TERMINATED; this->StartWritesDone(); } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientWriteReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm