diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h index e9c83512b..f3c22ed3e 100644 --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -1,143 +1,154 @@ #pragma once #include "BaseReactor.h" +#include "ThreadPool.h" -#include #include +#include #include #include #include +#include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - read a request from the client // - write N responses to the client // - terminate the connection template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { - std::shared_ptr statusHolder = std::make_shared(); + std::shared_ptr statusHolder = + std::make_shared(); Response response; bool initialized = false; void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; public: ServerWriteReactorBase(const Request *request); // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 virtual void initialize(){}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; // - argument response - should be filled with data that will be sent to the // client in the current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr writeResponse(Response *response) = 0; }; template void ServerWriteReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::exception &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->terminateCallback(); + this->validate(); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->statusHolder->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + if (!this->statusHolder->getStatus().ok()) { + LOG(ERROR) << this->statusHolder->getStatus().error_message(); + } + if (this->statusHolder->state != ReactorState::RUNNING) { + return; + } + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; + }); } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { - try { - if (!this->initialized) { - this->initialize(); - this->initialized = true; - } - this->response = Response(); - std::unique_ptr status = this->writeResponse(&this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::exception &e) { - LOG(ERROR) << "error: " << e.what(); - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + if (!this->initialized) { + this->initialize(); + this->initialized = true; + } + this->response = Response(); + std::unique_ptr status = + this->writeResponse(&this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + }, + [this](std::unique_ptr err) { + if (err != nullptr) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + }); } template void ServerWriteReactorBase::start() { this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr ServerWriteReactorBase::getStatusHolder() { return this->statusHolder; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm