Page MenuHomePhabricator

D5070.id16705.diff
No OneTemporary

D5070.id16705.diff

diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -102,12 +102,7 @@
this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
request.mutable_logdata()->c_str());
- put_client_blocking_read_cxx(
- this->blobHolder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(this->blobHolder.c_str());
return nullptr;
}
@@ -124,23 +119,13 @@
this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->blobHolder.c_str());
- put_client_blocking_read_cxx(
- this->blobHolder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(this->blobHolder.c_str());
put_client_write_cxx(
this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->hash.c_str());
- rust::String responseStr = put_client_blocking_read_cxx(
- this->blobHolder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably
- // want to delegate performing ops
- // to separate threads in the base
- // reactors
+ rust::String responseStr =
+ put_client_blocking_read_cxx(this->blobHolder.c_str());
// data exists?
if ((bool)tools::charPtrToInt(responseStr.c_str())) {
return std::make_unique<grpc::Status>(grpc::Status::OK);
@@ -149,12 +134,7 @@
this->blobHolder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::move(this->value).c_str());
- put_client_blocking_read_cxx(
- this->blobHolder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(this->blobHolder.c_str());
this->value = "";
} else {
this->persistenceMethod = PersistenceMethod::DB;
diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
--- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h
+++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h
@@ -1,13 +1,15 @@
#pragma once
#include "BaseReactor.h"
+#include "ThreadPool.h"
-#include <grpcpp/grpcpp.h>
#include <glog/logging.h>
+#include <grpcpp/grpcpp.h>
#include <atomic>
#include <memory>
#include <string>
+#include <thread>
namespace comm {
namespace network {
@@ -66,38 +68,46 @@
this->terminate(grpc::Status::OK);
return;
}
- try {
- std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- } catch (std::exception &e) {
- this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- return;
- }
- this->StartRead(&this->request);
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() {
+ std::unique_ptr<grpc::Status> status = this->readRequest(this->request);
+ if (status != nullptr) {
+ this->terminate(*status);
+ return;
+ }
+ this->StartRead(&this->request);
+ },
+ [this](std::unique_ptr<std::string> err) {
+ if (err != nullptr) {
+ this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, *err));
+ }
+ });
}
template <class Request, class Response>
void ServerReadReactorBase<Request, Response>::terminate(
const grpc::Status &status) {
this->statusHolder->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::exception &e) {
- this->statusHolder->setStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
- }
- if (!this->statusHolder->getStatus().ok()) {
- LOG(ERROR) << this->statusHolder->getStatus().error_message();
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- this->Finish(this->statusHolder->getStatus());
- this->statusHolder->state = ReactorState::TERMINATED;
+
+ ThreadPool::getInstance().scheduleWithCallback(
+ [this]() {
+ this->terminateCallback();
+ this->validate();
+ },
+ [this](std::unique_ptr<std::string> err) {
+ if (err != nullptr) {
+ this->statusHolder->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, *err));
+ }
+ if (!this->statusHolder->getStatus().ok()) {
+ LOG(ERROR) << this->statusHolder->getStatus().error_message();
+ }
+ if (this->statusHolder->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->Finish(this->statusHolder->getStatus());
+ this->statusHolder->state = ReactorState::TERMINATED;
+ });
}
template <class Request, class Response>

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 1:13 PM (19 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2582800
Default Alt Text
D5070.id16705.diff (6 KB)

Event Timeline