diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -1,13 +1,15 @@ #pragma once #include "BaseReactor.h" +#include "ThreadPool.h" -#include <grpcpp/grpcpp.h> #include <glog/logging.h> +#include <grpcpp/grpcpp.h> #include <atomic> #include <memory> #include <string> +#include <thread> namespace comm { namespace network { @@ -20,7 +22,8 @@ template <class Request, class Response> class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>, public BaseReactor { - std::shared_ptr<ReactorStatusHolder> statusHolder = std::make_shared<ReactorStatusHolder>(); + std::shared_ptr<ReactorStatusHolder> statusHolder = + std::make_shared<ReactorStatusHolder>(); Response response; bool initialized = false; @@ -61,21 +64,25 @@ void ServerWriteReactorBase<Request, Response>::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); - try { - this->terminateCallback(); - this->validate(); - } catch (std::exception &e) { - this->statusHolder->setStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } - if (!this->statusHolder->getStatus().ok()) { - LOG(ERROR) << this->statusHolder->getStatus().error_message(); - } - if (this->statusHolder->state != ReactorState::RUNNING) { - return; - } - this->Finish(this->statusHolder->getStatus()); - this->statusHolder->state = ReactorState::TERMINATED; + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + this->terminateCallback(); + this->validate(); + }, + [this](std::unique_ptr<std::string> err) { + if (err != nullptr) { + this->statusHolder->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + if (!this->statusHolder->getStatus().ok()) { + LOG(ERROR) << this->statusHolder->getStatus().error_message(); + } + if (this->statusHolder->state != ReactorState::RUNNING) { + return; + } + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; + }); } template <class Request, class Response> @@ -91,22 +98,26 @@ template <class Request, class Response> void ServerWriteReactorBase<Request, Response>::nextWrite() { - try { - if (!this->initialized) { - this->initialize(); - this->initialized = true; - } - this->response = Response(); - std::unique_ptr<grpc::Status> status = this->writeResponse(&this->response); - if (status != nullptr) { - this->terminate(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::exception &e) { - LOG(ERROR) << "error: " << e.what(); - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } + ThreadPool::getInstance().scheduleWithCallback( + [this]() { + if (!this->initialized) { + this->initialize(); + this->initialized = true; + } + this->response = Response(); + std::unique_ptr<grpc::Status> status = + this->writeResponse(&this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + }, + [this](std::unique_ptr<std::string> err) { + if (err != nullptr) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err)); + } + }); } template <class Request, class Response>