diff --git a/services/backup/src/Reactors/BaseReactor.h b/services/backup/src/Reactors/BaseReactor.h index 76e16e365..ed89c7f50 100644 --- a/services/backup/src/Reactors/BaseReactor.h +++ b/services/backup/src/Reactors/BaseReactor.h @@ -1,24 +1,24 @@ #pragma once -#include "ReactorUtility.h" +#include "ReactorStatusHolder.h" #include #include namespace comm { namespace network { namespace reactor { class BaseReactor { public: - virtual std::shared_ptr getUtility() = 0; + virtual std::shared_ptr getUtility() = 0; virtual void terminate(const grpc::Status &status) = 0; virtual void validate() = 0; virtual void doneCallback() = 0; virtual void terminateCallback() = 0; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/ReactorUtility.h b/services/backup/src/Reactors/ReactorStatusHolder.h similarity index 96% rename from services/backup/src/Reactors/ReactorUtility.h rename to services/backup/src/Reactors/ReactorStatusHolder.h index c3ce61f00..a0d95a4d8 100644 --- a/services/backup/src/Reactors/ReactorUtility.h +++ b/services/backup/src/Reactors/ReactorStatusHolder.h @@ -1,39 +1,39 @@ #pragma once #include #include #include namespace comm { namespace network { namespace reactor { enum class ReactorState { NONE = 0, RUNNING = 1, TERMINATED = 2, DONE = 3, }; -class ReactorUtility { +class ReactorStatusHolder { private: grpc::Status status = grpc::Status::OK; std::mutex statusAccessMutex; public: std::atomic state = ReactorState::NONE; grpc::Status getStatus() { const std::unique_lock lock(this->statusAccessMutex); return this->status; } void setStatus(const grpc::Status &status) { const std::unique_lock lock(this->statusAccessMutex); this->status = status; } }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h index fce0b3d4d..b7fcaa215 100644 --- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,128 +1,128 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { template class ClientBidiReactorBase : public grpc::ClientBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; std::shared_ptr response = nullptr; void nextWrite(); protected: Request request; public: grpc::ClientContext context; void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); } template void ClientBidiReactorBase::start() { if (this->utility->state != ReactorState::NONE) { return; } this->utility->state = ReactorState::RUNNING; this->nextWrite(); this->StartCall(); } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } template void ClientBidiReactorBase::terminate( const grpc::Status &status) { if (this->utility->getStatus().ok()) { this->utility->setStatus(status); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWritesDone(); this->utility->state = ReactorState::TERMINATED; } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->utility->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template -std::shared_ptr +std::shared_ptr ClientBidiReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h index 9e2a2a743..01c4f8bfe 100644 --- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -1,107 +1,107 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { template class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Response response; public: Request request; grpc::ClientContext context; void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr readResponse(Response &response) = 0; }; template void ClientReadReactorBase::start() { if (this->utility->state != ReactorState::NONE) { return; } this->StartRead(&this->response); if (this->utility->state != ReactorState::RUNNING) { this->StartCall(); this->utility->state = ReactorState::RUNNING; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartRead(&this->response); } template void ClientReadReactorBase::terminate( const grpc::Status &status) { if (this->utility->getStatus().ok()) { this->utility->setStatus(status); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->utility->state = ReactorState::TERMINATED; } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { this->utility->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template -std::shared_ptr +std::shared_ptr ClientReadReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h index 774eff055..8b2215d6e 100644 --- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,114 +1,114 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Request request; void nextWrite(); public: Response response; grpc::ClientContext context; void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr prepareRequest(Request &request) = 0; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::start() { if (this->start != ReactorState::NONE) { return; } this->utility->state = ReactorState::RUNNING; this->nextWrite(); } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { if (this->utility->getStatus().ok()) { this->utility->setStatus(status); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->utility->state = ReactorState::TERMINATED; this->StartWritesDone(); } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->utility->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template -std::shared_ptr +std::shared_ptr ClientWriteReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index 89c647d87..1b9ae96a3 100644 --- a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,157 +1,157 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void terminate(const grpc::Status &status) override; void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; void terminate(ServerBidiReactorStatus status); ServerBidiReactorStatus getStatus() const; void setStatus(const ServerBidiReactorStatus &status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->utility->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerBidiReactorBase::terminate( const grpc::Status &status) { this->terminate(ServerBidiReactorStatus(status)); } template void ServerBidiReactorBase::OnDone() { this->utility->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->setStatus(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } if (this->utility->state != ReactorState::RUNNING) { return; } if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->getStatus().status); } else { this->Finish(this->getStatus().status); } this->utility->state = ReactorState::TERMINATED; } template ServerBidiReactorStatus ServerBidiReactorBase::getStatus() const { return this->status; } template void ServerBidiReactorBase::setStatus( const ServerBidiReactorStatus &status) { this->status = status; } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } template -std::shared_ptr +std::shared_ptr ServerBidiReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h index d427f862d..7ece8fb97 100644 --- a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,109 +1,109 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Request request; protected: Response *response; public: ServerReadReactorBase(Response *response); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr readRequest(Request request) = 0; }; template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->utility->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { this->utility->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->Finish(this->utility->getStatus()); this->utility->state = ReactorState::TERMINATED; } template void ServerReadReactorBase::OnDone() { this->utility->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template -std::shared_ptr +std::shared_ptr ServerReadReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index a7b5e737c..eb7a8c510 100644 --- a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,131 +1,131 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Response response; bool initialized = false; void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; public: ServerWriteReactorBase(const Request *request); void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; virtual void initialize(){}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr writeResponse(Response *response) = 0; }; template void ServerWriteReactorBase::terminate( const grpc::Status &status) { this->utility->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->Finish(this->utility->getStatus()); this->utility->state = ReactorState::TERMINATED; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::start() { this->utility->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template -std::shared_ptr +std::shared_ptr ServerWriteReactorBase::getUtility() { return this->utility; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/BaseReactor.h b/services/blob/src/Reactors/BaseReactor.h index 76e16e365..ed89c7f50 100644 --- a/services/blob/src/Reactors/BaseReactor.h +++ b/services/blob/src/Reactors/BaseReactor.h @@ -1,24 +1,24 @@ #pragma once -#include "ReactorUtility.h" +#include "ReactorStatusHolder.h" #include #include namespace comm { namespace network { namespace reactor { class BaseReactor { public: - virtual std::shared_ptr getUtility() = 0; + virtual std::shared_ptr getUtility() = 0; virtual void terminate(const grpc::Status &status) = 0; virtual void validate() = 0; virtual void doneCallback() = 0; virtual void terminateCallback() = 0; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/ReactorUtility.h b/services/blob/src/Reactors/ReactorStatusHolder.h similarity index 96% rename from services/blob/src/Reactors/ReactorUtility.h rename to services/blob/src/Reactors/ReactorStatusHolder.h index c3ce61f00..a0d95a4d8 100644 --- a/services/blob/src/Reactors/ReactorUtility.h +++ b/services/blob/src/Reactors/ReactorStatusHolder.h @@ -1,39 +1,39 @@ #pragma once #include #include #include namespace comm { namespace network { namespace reactor { enum class ReactorState { NONE = 0, RUNNING = 1, TERMINATED = 2, DONE = 3, }; -class ReactorUtility { +class ReactorStatusHolder { private: grpc::Status status = grpc::Status::OK; std::mutex statusAccessMutex; public: std::atomic state = ReactorState::NONE; grpc::Status getStatus() { const std::unique_lock lock(this->statusAccessMutex); return this->status; } void setStatus(const grpc::Status &status) { const std::unique_lock lock(this->statusAccessMutex); this->status = status; } }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index 89c647d87..1b9ae96a3 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,157 +1,157 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void terminate(const grpc::Status &status) override; void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; void terminate(ServerBidiReactorStatus status); ServerBidiReactorStatus getStatus() const; void setStatus(const ServerBidiReactorStatus &status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->utility->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerBidiReactorBase::terminate( const grpc::Status &status) { this->terminate(ServerBidiReactorStatus(status)); } template void ServerBidiReactorBase::OnDone() { this->utility->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->setStatus(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } if (this->utility->state != ReactorState::RUNNING) { return; } if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->getStatus().status); } else { this->Finish(this->getStatus().status); } this->utility->state = ReactorState::TERMINATED; } template ServerBidiReactorStatus ServerBidiReactorBase::getStatus() const { return this->status; } template void ServerBidiReactorBase::setStatus( const ServerBidiReactorStatus &status) { this->status = status; } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } template -std::shared_ptr +std::shared_ptr ServerBidiReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h index d427f862d..7ece8fb97 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,109 +1,109 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Request request; protected: Response *response; public: ServerReadReactorBase(Response *response); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr readRequest(Request request) = 0; }; template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->utility->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { this->utility->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->Finish(this->utility->getStatus()); this->utility->state = ReactorState::TERMINATED; } template void ServerReadReactorBase::OnDone() { this->utility->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template -std::shared_ptr +std::shared_ptr ServerReadReactorBase::getUtility() { return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index a7b5e737c..eb7a8c510 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,131 +1,131 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr utility; Response response; bool initialized = false; void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; public: ServerWriteReactorBase(const Request *request); void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; virtual void initialize(){}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr writeResponse(Response *response) = 0; }; template void ServerWriteReactorBase::terminate( const grpc::Status &status) { this->utility->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->utility->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->utility->getStatus().ok()) { std::cout << "error: " << this->utility->getStatus().error_message() << std::endl; } if (this->utility->state != ReactorState::RUNNING) { return; } this->Finish(this->utility->getStatus()); this->utility->state = ReactorState::TERMINATED; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::start() { this->utility->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template -std::shared_ptr +std::shared_ptr ServerWriteReactorBase::getUtility() { return this->utility; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/BaseReactor.h b/services/lib/lib_src/BaseReactor.h similarity index 79% copy from services/backup/src/Reactors/BaseReactor.h copy to services/lib/lib_src/BaseReactor.h index 76e16e365..ed89c7f50 100644 --- a/services/backup/src/Reactors/BaseReactor.h +++ b/services/lib/lib_src/BaseReactor.h @@ -1,24 +1,24 @@ #pragma once -#include "ReactorUtility.h" +#include "ReactorStatusHolder.h" #include #include namespace comm { namespace network { namespace reactor { class BaseReactor { public: - virtual std::shared_ptr getUtility() = 0; + virtual std::shared_ptr getUtility() = 0; virtual void terminate(const grpc::Status &status) = 0; virtual void validate() = 0; virtual void doneCallback() = 0; virtual void terminateCallback() = 0; }; } // namespace reactor } // namespace network } // namespace comm