Page MenuHomePhorge

D5071.1767366180.diff
No OneTemporary

Size
3 KB
Referenced Files
None
Subscribers
None

D5071.1767366180.diff

diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
--- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
+++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h
@@ -1,13 +1,15 @@
#pragma once
#include "BaseReactor.h"
+#include "ThreadPool.h"
-#include <grpcpp/grpcpp.h>
#include <glog/logging.h>
+#include <grpcpp/grpcpp.h>
#include <atomic>
#include <memory>
#include <string>
+#include <thread>
namespace comm {
namespace network {
@@ -20,7 +22,8 @@
template <class Request, class Response>
class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>,
public BaseReactor {
- std::shared_ptr<ReactorStatusHolder> statusHolder = std::make_shared<ReactorStatusHolder>();
+ std::shared_ptr<ReactorStatusHolder> statusHolder =
+ std::make_shared<ReactorStatusHolder>();
Response response;
bool initialized = false;
@@ -61,21 +64,25 @@
void ServerWriteReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
this->statusHolder->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::exception &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() {
+ this->terminateCallback();
+ this->validate();
+ },
+ [this](std::unique_ptr<std::string> err) {
+ if (err != nullptr) {
+ this->statusHolder->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, *err));
+ }
+ if (!this->statusHolder->getStatus().ok()) {
+ LOG(ERROR) << this->statusHolder->getStatus().error_message();
+ }
+ if (this->statusHolder->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->Finish(this->statusHolder->getStatus());
+ this->statusHolder->state = ReactorState::TERMINATED;
+ });
}
template <class Request, class Response>
@@ -91,22 +98,26 @@
template <class Request, class Response>
void ServerWriteReactorBase<Request, Response>::nextWrite() {
- try {
- if (!this->initialized) {
- this->initialize();
- this->initialized = true;
- }
- this->response = Response();
- std::unique_ptr<grpc::Status> status = this->writeResponse(&this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- } catch (std::exception &e) {
- LOG(ERROR) << "error: " << e.what();
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() {
+ if (!this->initialized) {
+ this->initialize();
+ this->initialized = true;
+ }
+ this->response = Response();
+ std::unique_ptr<grpc::Status> status =
+ this->writeResponse(&this->response);
+ if (status != nullptr) {
+ this->terminate(*status);
+ return;
+ }
+ this->StartWrite(&this->response);
+ },
+ [this](std::unique_ptr<std::string> err) {
+ if (err != nullptr) {
+ this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
+ }
+ });
}
template <class Request, class Response>

File Metadata

Mime Type
text/plain
Expires
Fri, Jan 2, 3:03 PM (8 h, 9 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5876462
Default Alt Text
D5071.1767366180.diff (3 KB)

Event Timeline