diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h index 3987f3f40..f6a52c7d0 100644 --- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h @@ -1,144 +1,144 @@ #pragma once #include "BaseReactor.h" #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - repeat: // - write a request to the server // - read a response from the server // - terminate the connection template class ClientBidiReactorBase : public grpc::ClientBidiReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); std::shared_ptr response = nullptr; void nextWrite(); protected: Request request; public: grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument request - request that's about to be prepared for the next cycle // - argument previousResponse - response received during the previous cycle // (may be nullptr) // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); } template void ClientBidiReactorBase::start() { if (this->statusHolder->state != ReactorState::NONE) { return; } this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); this->StartCall(); } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } template void ClientBidiReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { - std::cout << "error: " << this->statusHolder->getStatus().error_message() - << std::endl; + LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWritesDone(); this->statusHolder->state = ReactorState::TERMINATED; } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientBidiReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h index c4213b996..bf0eeede3 100644 --- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientReadReactorBase.h @@ -1,121 +1,121 @@ #pragma once #include "BaseReactor.h" #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - send a request to the server // - read N responses from the server // - terminate the connection template class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Response response; public: Request request; grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument response - response from the server that was read during the // current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr readResponse(Response &response) = 0; }; template void ClientReadReactorBase::start() { if (this->statusHolder->state != ReactorState::NONE) { return; } this->StartRead(&this->response); if (this->statusHolder->state != ReactorState::RUNNING) { this->StartCall(); this->statusHolder->state = ReactorState::RUNNING; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartRead(&this->response); } template void ClientReadReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { - std::cout << "error: " << this->statusHolder->getStatus().error_message() - << std::endl; + LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->statusHolder->state = ReactorState::TERMINATED; } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientReadReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h index 308cba23b..5e6317b42 100644 --- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h @@ -1,129 +1,129 @@ #pragma once #include "BaseReactor.h" #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - write N requests to the server // - terminate the connection template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; bool initialized = false; void nextWrite(); public: Response response; grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument request - request that should be edited and is going to be sent // in the current cycle to the server // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr prepareRequest(Request &request) = 0; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::start() { throw std::runtime_error("this class has not been tested"); if (this->statusHolder->state != ReactorState::NONE) { return; } this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { - std::cout << "error: " << this->statusHolder->getStatus().error_message() - << std::endl; + LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->statusHolder->state = ReactorState::TERMINATED; this->StartWritesDone(); } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientWriteReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h index cd2ed0a25..98c6c8cf4 100644 --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -1,121 +1,121 @@ #pragma once #include "BaseReactor.h" #include +#include #include #include #include #include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - read N requests from the client // - write a final response to the client (may be empty) // - terminate the connection template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; protected: Response *response; public: ServerReadReactorBase(Response *response); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; // - argument request - data read from the client in the current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr readRequest(Request request) = 0; }; template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->statusHolder->getStatus().ok()) { - std::cout << "error: " << this->statusHolder->getStatus().error_message() - << std::endl; + LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->Finish(this->statusHolder->getStatus()); this->statusHolder->state = ReactorState::TERMINATED; } template void ServerReadReactorBase::OnDone() { this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr ServerReadReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h index 14205a048..b1bc2ab11 100644 --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -1,144 +1,144 @@ #pragma once #include "BaseReactor.h" #include +#include #include #include #include #include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - read a request from the client // - write N responses to the client // - terminate the connection template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Response response; bool initialized = false; void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; public: ServerWriteReactorBase(const Request *request); // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 virtual void initialize(){}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); void OnDone() override; // - argument response - should be filled with data that will be sent to the // client in the current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr writeResponse(Response *response) = 0; }; template void ServerWriteReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->statusHolder->getStatus().ok()) { - std::cout << "error: " << this->statusHolder->getStatus().error_message() - << std::endl; + LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->Finish(this->statusHolder->getStatus()); this->statusHolder->state = ReactorState::TERMINATED; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; + LOG(ERROR) << "error: " << e.what(); this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::start() { this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr ServerWriteReactorBase::getStatusHolder() { return this->statusHolder; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm