diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h --- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,11 +1,13 @@ #pragma once +#include "BaseReactor.h" + #include +#include #include #include #include -#include namespace comm { namespace network { @@ -23,15 +25,13 @@ template class ServerBidiReactorBase - : public grpc::ServerBidiReactor { + : public grpc::ServerBidiReactor, public BaseReactor { Request request; Response response; - std::atomic finished = false; protected: ServerBidiReactorStatus status; bool readingAborted = false; - public: ServerBidiReactorBase(); @@ -39,24 +39,21 @@ void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; - void terminate(ServerBidiReactorStatus status); + void terminate(ServerBidiReactorStatus status); // ??? virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; - virtual void initialize(){}; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template ServerBidiReactorBase::ServerBidiReactorBase() { - this->initialize(); + this->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerBidiReactorBase::OnDone() { + this->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 @@ -74,7 +71,7 @@ this->status = ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (this->finished) { + if (this->state != ReactorState::RUNNING) { return; } if (this->status.sendLastResponse) { @@ -83,7 +80,7 @@ } else { this->Finish(this->status.status); } - this->finished = true; + this->state = ReactorState::TERMINATED; } template diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h --- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,75 +1,42 @@ #pragma once +#include "BaseReactor.h" + #include +#include #include #include #include -#include namespace comm { namespace network { namespace reactor { template -class ServerReadReactorBase : public grpc::ServerReadReactor { +class ServerReadReactorBase : public grpc::ServerReadReactor, + public BaseReactor { Request request; - std::atomic finished = false; - - void terminate(grpc::Status status); - protected: Response *response; grpc::Status status; public: ServerReadReactorBase(Response *response); - - void OnDone() override; void OnReadDone(bool ok) override; - + void terminate(const grpc::Status &status) override; + void OnDone() override; virtual std::unique_ptr readRequest(Request request) = 0; - virtual void initialize(){}; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; -template -void ServerReadReactorBase::terminate(grpc::Status status) { - this->status = status; - try { - this->terminateCallback(); - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->finished) { - return; - } - this->Finish(this->status); - this->finished = true; -} - template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { - this->initialize(); + this->state = ReactorState::RUNNING; this->StartRead(&this->request); } -template -void ServerReadReactorBase::OnDone() { - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; -} - template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { @@ -92,6 +59,35 @@ this->StartRead(&this->request); } +template +void ServerReadReactorBase::terminate( + const grpc::Status &status) { + this->status = status; + try { + this->terminateCallback(); + this->validate(); + } catch (std::runtime_error &e) { + this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + if (!this->status.ok()) { + std::cout << "error: " << this->status.error_message() << std::endl; + } + if (this->state != ReactorState::RUNNING) { + return; + } + this->Finish(this->status); + this->state = ReactorState::TERMINATED; +} + +template +void ServerReadReactorBase::OnDone() { + this->state = ReactorState::DONE; + this->doneCallback(); + // This looks weird but apparently it is okay to do this. More information: + // https://phabricator.ashoat.com/D3246#87890 + delete this; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h --- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include #include @@ -12,14 +14,11 @@ namespace reactor { template -class ServerWriteReactorBase : public grpc::ServerWriteReactor { +class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { Response response; bool initialized = false; - std::atomic finished = false; - void terminate(grpc::Status status); void nextWrite(); - protected: // this is a const ref since it's not meant to be modified const Request &request; @@ -29,18 +28,17 @@ ServerWriteReactorBase(const Request *request); void start(); - void OnDone() override; + virtual void initialize() {} void OnWriteDone(bool ok) override; + void terminate(const grpc::Status &status); + void OnDone() override; virtual std::unique_ptr writeResponse(Response *response) = 0; - virtual void initialize(){}; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template -void ServerWriteReactorBase::terminate(grpc::Status status) { +void ServerWriteReactorBase::terminate( + const grpc::Status &status) { this->status = status; try { this->terminateCallback(); @@ -51,11 +49,11 @@ if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - if (this->finished) { + if (this->state != ReactorState::RUNNING) { return; } this->Finish(this->status); - this->finished = true; + this->state = ReactorState::TERMINATED; } template @@ -91,6 +89,7 @@ template void ServerWriteReactorBase::start() { + this->state = ReactorState::RUNNING; this->nextWrite(); }