Page MenuHomePhabricator

D3787.id11657.diff
No OneTemporary

D3787.id11657.diff

diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
--- a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
+++ b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <atomic>
@@ -23,15 +25,13 @@
template <class Request, class Response>
class ServerBidiReactorBase
- : public grpc::ServerBidiReactor<Request, Response> {
+ : public grpc::ServerBidiReactor<Request, Response>, public BaseReactor {
Request request;
Response response;
- std::atomic<bool> finished = false;
protected:
ServerBidiReactorStatus status;
bool readingAborted = false;
-
public:
ServerBidiReactorBase();
@@ -39,24 +39,21 @@
void OnReadDone(bool ok) override;
void OnWriteDone(bool ok) override;
- void terminate(ServerBidiReactorStatus status);
+ void terminate(ServerBidiReactorStatus status); // ???
virtual std::unique_ptr<ServerBidiReactorStatus>
handleRequest(Request request, Response *response) = 0;
- virtual void initialize(){};
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() {
- this->initialize();
+ this->state = ReactorState::RUNNING;
this->StartRead(&this->request);
}
template <class Request, class Response>
void ServerBidiReactorBase<Request, Response>::OnDone() {
+ this->state = ReactorState::DONE;
this->doneCallback();
// This looks weird but apparently it is okay to do this. More information:
// https://phabricator.ashoat.com/D3246#87890
@@ -74,7 +71,7 @@
this->status = ServerBidiReactorStatus(
grpc::Status(grpc::StatusCode::INTERNAL, e.what()));
}
- if (this->finished) {
+ if (this->state != ReactorState::RUNNING) {
return;
}
if (this->status.sendLastResponse) {
@@ -83,7 +80,7 @@
} else {
this->Finish(this->status.status);
}
- this->finished = true;
+ this->state = ReactorState::TERMINATED;
}
template <class Request, class Response>
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h
--- a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h
+++ b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <atomic>
@@ -12,64 +14,29 @@
namespace reactor {
template <class Request, class Response>
-class ServerReadReactorBase : public grpc::ServerReadReactor<Request> {
+class ServerReadReactorBase : public grpc::ServerReadReactor<Request>,
+ public BaseReactor {
Request request;
- std::atomic<bool> finished = false;
-
- void terminate(grpc::Status status);
-
protected:
Response *response;
grpc::Status status;
public:
ServerReadReactorBase(Response *response);
-
- void OnDone() override;
void OnReadDone(bool ok) override;
-
+ void terminate(const grpc::Status &status) override;
+ void OnDone() override;
virtual std::unique_ptr<grpc::Status> readRequest(Request request) = 0;
- virtual void initialize(){};
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::terminate(grpc::Status status) {
- this->status = status;
- try {
- this->terminateCallback();
- this->validate();
- } catch (std::runtime_error &e) {
- this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- if (!this->status.ok()) {
- std::cout << "error: " << this->status.error_message() << std::endl;
- }
- if (this->finished) {
- return;
- }
- this->Finish(this->status);
- this->finished = true;
-}
-
template <class Request, class Response>
ServerReadReactorBase<Request, Response>::ServerReadReactorBase(
Response *response)
: response(response) {
- this->initialize();
+ this->state = ReactorState::RUNNING;
this->StartRead(&this->request);
}
-template <class Request, class Response>
-void ServerReadReactorBase<Request, Response>::OnDone() {
- this->doneCallback();
- // This looks weird but apparently it is okay to do this. More information:
- // https://phabricator.ashoat.com/D3246#87890
- delete this;
-}
-
template <class Request, class Response>
void ServerReadReactorBase<Request, Response>::OnReadDone(bool ok) {
if (!ok) {
@@ -92,6 +59,35 @@
this->StartRead(&this->request);
}
+template <class Request, class Response>
+void ServerReadReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
+ this->status = status;
+ try {
+ this->terminateCallback();
+ this->validate();
+ } catch (std::runtime_error &e) {
+ this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ }
+ if (!this->status.ok()) {
+ std::cout << "error: " << this->status.error_message() << std::endl;
+ }
+ if (this->state != ReactorState::RUNNING) {
+ return;
+ }
+ this->Finish(this->status);
+ this->state = ReactorState::TERMINATED;
+}
+
+template <class Request, class Response>
+void ServerReadReactorBase<Request, Response>::OnDone() {
+ this->state = ReactorState::DONE;
+ this->doneCallback();
+ // This looks weird but apparently it is okay to do this. More information:
+ // https://phabricator.ashoat.com/D3246#87890
+ delete this;
+}
+
} // namespace reactor
} // namespace network
} // namespace comm
diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
--- a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
+++ b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h
@@ -1,5 +1,7 @@
#pragma once
+#include "BaseReactor.h"
+
#include <grpcpp/grpcpp.h>
#include <atomic>
@@ -12,14 +14,11 @@
namespace reactor {
template <class Request, class Response>
-class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response> {
+class ServerWriteReactorBase : public grpc::ServerWriteReactor<Response>, public BaseReactor {
Response response;
bool initialized = false;
- std::atomic<bool> finished = false;
- void terminate(grpc::Status status);
void nextWrite();
-
protected:
// this is a const ref since it's not meant to be modified
const Request &request;
@@ -29,18 +28,17 @@
ServerWriteReactorBase(const Request *request);
void start();
- void OnDone() override;
+ virtual void initialize() {}
void OnWriteDone(bool ok) override;
+ void terminate(const grpc::Status &status);
+ void OnDone() override;
virtual std::unique_ptr<grpc::Status> writeResponse(Response *response) = 0;
- virtual void initialize(){};
- virtual void validate(){};
- virtual void doneCallback(){};
- virtual void terminateCallback(){};
};
template <class Request, class Response>
-void ServerWriteReactorBase<Request, Response>::terminate(grpc::Status status) {
+void ServerWriteReactorBase<Request, Response>::terminate(
+ const grpc::Status &status) {
this->status = status;
try {
this->terminateCallback();
@@ -51,11 +49,11 @@
if (!this->status.ok()) {
std::cout << "error: " << this->status.error_message() << std::endl;
}
- if (this->finished) {
+ if (this->state != ReactorState::RUNNING) {
return;
}
this->Finish(this->status);
- this->finished = true;
+ this->state = ReactorState::TERMINATED;
}
template <class Request, class Response>
@@ -91,6 +89,7 @@
template <class Request, class Response>
void ServerWriteReactorBase<Request, Response>::start() {
+ this->state = ReactorState::RUNNING;
this->nextWrite();
}

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 9:36 AM (21 h, 6 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2592405
Default Alt Text
D3787.id11657.diff (7 KB)

Event Timeline