Page MenuHomePhabricator

D3789.id12042.diff
No OneTemporary

D3789.id12042.diff

diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
--- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
+++ b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <atomic>
@@ -22,11 +24,11 @@
};
template <class Request, class Response>
-class ServerBidiReactorBase
- : public grpc::ServerBidiReactor<Request, Response> {
+class ServerBidiReactorBase : public grpc::ServerBidiReactor<Request, Response>,
+ public BaseReactor {
+ std::shared_ptr<ReactorUtility> utility;
Request request;
Response response;
- std::atomic<bool> finished = false;
protected:
ServerBidiReactorStatus status;
@@ -35,28 +37,39 @@
public:
ServerBidiReactorBase();
+ void terminate(const grpc::Status &status) override;
+ void validate() override{};
+ void doneCallback() override{};
+ void terminateCallback() override{};
+
void OnDone() override;
void OnReadDone(bool ok) override;
void OnWriteDone(bool ok) override;
+ std::shared_ptr<ReactorUtility> getUtility() override;
void terminate(ServerBidiReactorStatus status);
+ ServerBidiReactorStatus getStatus() const;
+ void setStatus(const ServerBidiReactorStatus &status);
virtual std::unique_ptr<ServerBidiReactorStatus>
handleRequest(Request request, Response *response) = 0;
- virtual void initialize(){};
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() {
- this->initialize();
+ this->utility->state = ReactorState::RUNNING;
this->StartRead(&this->request);
}
+template <class Request, class Response>
+void ServerBidiReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
+ this->terminate(ServerBidiReactorStatus(status));
+}
+
template <class Request, class Response>
void ServerBidiReactorBase<Request, Response>::OnDone() {
+ this->utility->state = ReactorState::DONE;
this->doneCallback();
// This looks weird but apparently it is okay to do this. More information:
// https://phabricator.ashoat.com/D3246#87890
@@ -66,24 +79,36 @@
template <class Request, class Response>
void ServerBidiReactorBase<Request, Response>::terminate(
ServerBidiReactorStatus status) {
- this->status = status;
+ this->setStatus(status);
try {
this->terminateCallback();
this->validate();
} catch (std::runtime_error &e) {
- this->status = ServerBidiReactorStatus(
- grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
+ this->setStatus(ServerBidiReactorStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, e.what())));
}
- if (this->finished) {
+ if (this->utility->state != ReactorState::RUNNING) {
return;
}
- if (this->status.sendLastResponse) {
+ if (this->getStatus().sendLastResponse) {
this->StartWriteAndFinish(
- &this->response, grpc::WriteOptions(), this->status.status);
+ &this->response, grpc::WriteOptions(), this->getStatus().status);
} else {
- this->Finish(this->status.status);
+ this->Finish(this->getStatus().status);
}
- this->finished = true;
+ this->utility->state = ReactorState::TERMINATED;
+}
+
+template <class Request, class Response>
+ServerBidiReactorStatus
+ServerBidiReactorBase<Request, Response>::getStatus() const {
+ return this->status;
+}
+
+template <class Request, class Response>
+void ServerBidiReactorBase<Request, Response>::setStatus(
+ const ServerBidiReactorStatus &status) {
+ this->status = status;
}
template <class Request, class Response>
@@ -121,6 +146,12 @@
this->StartRead(&this->request);
}
+template <class Request, class Response>
+std::shared_ptr<ReactorUtility>
+ServerBidiReactorBase<Request, Response>::getUtility() {
+ return this->utility;
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
--- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
+++ b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <atomic>
@@ -12,64 +14,37 @@
namespace reactor {
template <class Request, class Response>
-class ServerReadReactorBase : public grpc::ServerReadReactor<Request> {
+class ServerReadReactorBase : public grpc::ServerReadReactor<Request>,
+ public BaseReactor {
+ std::shared_ptr<ReactorUtility> utility;
Request request;
- std::atomic<bool> finished = false;
-
- void terminate(grpc::Status status);
protected:
Response *response;
- grpc::Status status;
public:
ServerReadReactorBase(Response *response);
- void OnDone() override;
+ void validate() override{};
+ void doneCallback() override{};
+ void terminateCallback() override{};
+
void OnReadDone(bool ok) override;
+ void terminate(const grpc::Status &status) override;
+ void OnDone() override;
+ std::shared_ptr<ReactorUtility> getUtility() override;
virtual std::unique_ptr<grpc::Status> readRequest(Request request) = 0;
- virtual void initialize(){};
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::terminate(grpc::Status status) {
- this->status = status;
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
- }
- if (this->finished) {
- return;
- }
- this->Finish(this->status);
- this->finished = true;
-}
-
template <class Request, class Response>
ServerReadReactorBase<Request, Response>::ServerReadReactorBase(
Response *response)
: response(response) {
- this->initialize();
+ this->utility->state = ReactorState::RUNNING;
this->StartRead(&this->request);
}
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnDone() {
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
template <class Request, class Response>
void ServerReadReactorBase<Request, Response>::OnReadDone(bool ok) {
if (!ok) {
@@ -92,6 +67,43 @@
this->StartRead(&this->request);
}
+template <class Request, class Response>
+void ServerReadReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
+ this->utility->setStatus(status);
+ try {
+ this->terminateCallback();
+ this->validate();
+ } catch (std::runtime_error &e) {
+ this->utility->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
+ }
+ if (!this->utility->getStatus().ok()) {
+ std::cout << "error: " << this->utility->getStatus().error_message()
+ << std::endl;
+ }
+ if (this->utility->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->Finish(this->utility->getStatus());
+ this->utility->state = ReactorState::TERMINATED;
+}
+
+template <class Request, class Response>
+void ServerReadReactorBase<Request, Response>::OnDone() {
+ this->utility->state = ReactorState::DONE;
+ this->doneCallback();
+ // This looks weird but apparently it is okay to do this. More information:
+ // https://phabricator.ashoat.com/D3246#87890
+ delete this;
+}
+
+template <class Request, class Response>
+std::shared_ptr<ReactorUtility>
+ServerReadReactorBase<Request, Response>::getUtility() {
+ return this->utility;
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
--- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
+++ b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <atomic>
@@ -12,50 +14,56 @@
namespace reactor {
template <class Request, class Response>
-class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response> {
+class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>,
+ public BaseReactor {
+ std::shared_ptr<ReactorUtility> utility;
Response response;
bool initialized = false;
- std::atomic<bool> finished = false;
- void terminate(grpc::Status status);
void nextWrite();
protected:
// this is a const ref since it's not meant to be modified
const Request &request;
- grpc::Status status;
public:
ServerWriteReactorBase(const Request *request);
void start();
- void OnDone() override;
+
+ void validate() override{};
+ void doneCallback() override{};
+ void terminateCallback() override{};
+
+ virtual void initialize(){};
void OnWriteDone(bool ok) override;
+ void terminate(const grpc::Status &status);
+ void OnDone() override;
+ std::shared_ptr<ReactorUtility> getUtility() override;
virtual std::unique_ptr<grpc::Status> writeResponse(Response *response) = 0;
- virtual void initialize(){};
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::terminate(grpc::Status status) {
- this->status = status;
+void ServerWriteReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
+ this->utility->setStatus(status);
try {
this->terminateCallback();
this->validate();
} catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ this->utility->setStatus(
+ grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
}
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
+ if (!this->utility->getStatus().ok()) {
+ std::cout << "error: " << this->utility->getStatus().error_message()
+ << std::endl;
}
- if (this->finished) {
+ if (this->utility->state != ReactorState::RUNNING) {
return;
}
- this->Finish(this->status);
- this->finished = true;
+ this->Finish(this->utility->getStatus());
+ this->utility->state = ReactorState::TERMINATED;
}
template <class Request, class Response>
@@ -91,6 +99,7 @@
template <class Request, class Response>
void ServerWriteReactorBase<Request, Response>::start() {
+ this->utility->state = ReactorState::RUNNING;
this->nextWrite();
}
@@ -102,6 +111,12 @@
delete this;
}
+template <class Request, class Response>
+std::shared_ptr<ReactorUtility>
+ServerWriteReactorBase<Request, Response>::getUtility() {
+ return this->utility;
+}
+
template <class Request, class Response>
void ServerWriteReactorBase<Request, Response>::OnWriteDone(bool ok) {
if (!ok) {

File Metadata

Mime Type
text/plain
Expires
Sun, Nov 24, 1:25 PM (20 h, 43 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2575975
Default Alt Text
D3789.id12042.diff (11 KB)

Event Timeline