Page MenuHomePhabricator

D5069.id16655.diff
No OneTemporary

D5069.id16655.diff

diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
--- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
+++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp
@@ -71,24 +71,14 @@
this->holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kHolder),
this->holder.c_str());
- put_client_blocking_read_cxx(
- this->holder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(this->holder.c_str());
put_client_write_cxx(
this->holder.c_str(),
tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash),
this->dataHash.c_str());
- rust::String responseStr = put_client_blocking_read_cxx(
- this->holder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably
- // want to delegate performing ops
- // to separate threads in the base
- // reactors
+ rust::String responseStr =
+ put_client_blocking_read_cxx(this->holder.c_str());
// data exists?
if ((bool)tools::charPtrToInt(responseStr.c_str())) {
return std::make_unique<ServerBidiReactorStatus>(
@@ -102,15 +92,10 @@
}
put_client_write_cxx(
this->holder.c_str(),
- tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
+ tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk),
std::string(std::move(*request.mutable_newcompactionchunk()))
.c_str());
- put_client_blocking_read_cxx(
- this->holder.c_str()); // todo this should be avoided
- // (blocking); we should be able to
- // ignore responses; we probably want to
- // delegate performing ops to separate
- // threads in the base reactors
+ put_client_blocking_read_cxx(this->holder.c_str());
return nullptr;
}
diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
--- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
+++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h
@@ -1,12 +1,14 @@
#pragma once
#include "BaseReactor.h"
+#include "Worker.h"
#include <grpcpp/grpcpp.h>
#include <atomic>
#include <memory>
#include <string>
+#include <thread>
namespace comm {
namespace network {
@@ -95,23 +97,27 @@
void ServerBidiReactorBase<Request, Response>::terminate(
ServerBidiReactorStatus status) {
this->setStatus(status);
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::exception &e) {
- this->setStatus(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
- }
- if (this->statusHolder->state != ReactorState::RUNNING) {
- return;
- }
- if (this->getStatus().sendLastResponse) {
- this->StartWriteAndFinish(
- &this->response, grpc::WriteOptions(), this->getStatus().status);
- } else {
- this->Finish(this->getStatus().status);
- }
- this->statusHolder->state = ReactorState::TERMINATED;
+ Worker::getInstance().scheduleWithCallback(
+ [this]() {
+ this->terminateCallback();
+ this->validate();
+ },
+ [this](std::unique_ptr<std::string> err) {
+ if (err != nullptr) {
+ this->setStatus(ServerBidiReactorStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, std::string(*err))));
+ }
+ if (this->statusHolder->state != ReactorState::RUNNING) {
+ return;
+ }
+ if (this->getStatus().sendLastResponse) {
+ this->StartWriteAndFinish(
+ &this->response, grpc::WriteOptions(), this->getStatus().status);
+ } else {
+ this->Finish(this->getStatus().status);
+ }
+ this->statusHolder->state = ReactorState::TERMINATED;
+ });
}
template <class Request, class Response>
@@ -136,19 +142,23 @@
this->terminate(ServerBidiReactorStatus(grpc::Status::OK));
return;
}
- try {
- this->response = Response();
- std::unique_ptr<ServerBidiReactorStatus> status =
- this->handleRequest(this->request, &this->response);
- if (status != nullptr) {
- this->terminate(*status);
- return;
- }
- this->StartWrite(&this->response);
- } catch (std::exception &e) {
- this->terminate(ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
- }
+ Worker::getInstance().scheduleWithCallback(
+ [this]() {
+ this->response = Response();
+ std::unique_ptr<ServerBidiReactorStatus> status =
+ this->handleRequest(this->request, &this->response);
+ if (status != nullptr) {
+ this->terminate(*status);
+ return;
+ }
+ this->StartWrite(&this->response);
+ },
+ [this](std::unique_ptr<std::string> err) {
+ if (err != nullptr) {
+ this->terminate(ServerBidiReactorStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, *err)));
+ }
+ });
}
template <class Request, class Response>

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 25, 10:23 PM (21 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2581143
Default Alt Text
D5069.id16655.diff (5 KB)

Event Timeline