diff --git a/services/blob/docker-server/contents/server/CMakeLists.txt b/services/blob/docker-server/contents/server/CMakeLists.txt --- a/services/blob/docker-server/contents/server/CMakeLists.txt +++ b/services/blob/docker-server/contents/server/CMakeLists.txt @@ -52,6 +52,8 @@ include_directories( ./src ./src/DatabaseEntities + ./src/Reactors/server + ./src/Reactors/server/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion diff --git a/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h new file mode 100644 --- /dev/null +++ b/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -0,0 +1,93 @@ +#pragma once + +#include + +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +template +class ServerBidiReactorBase + : public grpc::ServerBidiReactor { + Request request; + Response response; + +protected: + grpc::Status status; + bool readingAborted = false; + bool sendLastResponse = false; + +public: + ServerBidiReactorBase(); + + void OnDone() override; + void OnReadDone(bool ok) override; + void OnWriteDone(bool ok) override; + + void terminate(grpc::Status status); + + virtual std::unique_ptr + handleRequest(Request request, Response *response) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; +}; + +template +ServerBidiReactorBase::ServerBidiReactorBase() { + this->initialize(); + this->StartRead(&this->request); +} + +template +void ServerBidiReactorBase::OnDone() { + this->doneCallback(); + delete this; +} + +template +void ServerBidiReactorBase::terminate(grpc::Status status) { + this->status = status; + if (this->sendLastResponse) { + this->StartWriteAndFinish(&this->response, grpc::WriteOptions(), status); + } else { + this->Finish(status); + } +} + +template +void ServerBidiReactorBase::OnReadDone(bool ok) { + if (!ok) { + this->readingAborted = true; + this->terminate(grpc::Status(grpc::StatusCode::ABORTED, "no more reads")); + return; + } + try { + this->response = Response(); + std::unique_ptr status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } +} + +template +void ServerBidiReactorBase::OnWriteDone(bool ok) { + if (!ok) { + std::cout << "Server write failed" << std::endl; + return; + } + this->StartRead(&this->request); +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h new file mode 100644 --- /dev/null +++ b/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -0,0 +1,75 @@ +#pragma once + +#include + +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +template +class ServerReadReactorBase : public grpc::ServerReadReactor { + Request request; + + void terminate(grpc::Status status); + +protected: + Response *response; + grpc::Status status; + +public: + ServerReadReactorBase(Response *response); + + void OnDone() override; + void OnReadDone(bool ok) override; + + virtual std::unique_ptr readRequest(Request request) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; +}; + +template +void ServerReadReactorBase::terminate(grpc::Status status) { + this->status = status; + this->Finish(status); +} + +template +ServerReadReactorBase::ServerReadReactorBase( + Response *response) + : response(response) { + this->initialize(); + this->StartRead(&this->request); +} + +template +void ServerReadReactorBase::OnDone() { + this->doneCallback(); + delete this; +} + +template +void ServerReadReactorBase::OnReadDone(bool ok) { + if (!ok) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "reading error")); + return; + } + try { + std::unique_ptr status = this->readRequest(this->request); + if (status != nullptr) { + this->terminate(*status); + return; + } + } catch (std::runtime_error &e) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + return; + } + this->StartRead(&this->request); +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h new file mode 100644 --- /dev/null +++ b/services/blob/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -0,0 +1,86 @@ +#pragma once + +#include + +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +template +class ServerWriteReactorBase : public grpc::ServerWriteReactor { + Response response; + bool initialized = false; + +protected: + // this is a const ref since it's not meant to be modified + const Request &request; + +public: + ServerWriteReactorBase(const Request *request); + + virtual void NextWrite(); + void OnDone() override; + void OnWriteDone(bool ok) override; + + virtual std::unique_ptr writeResponse(Response *response) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; +}; + +template +ServerWriteReactorBase::ServerWriteReactorBase( + const Request *request) + : request(*request) { + // we cannot call this->NextWrite() here because it's going to call it on + // the base class, not derived leading to the runtime error of calling + // a pure virtual function + // NextWrite has to be exposed as a public function and called explicitly + // to initialize writing +} + +template +void ServerWriteReactorBase::NextWrite() { + try { + if (!this->initialized) { + this->initialize(); + this->initialized = true; + } + this->response = Response(); + std::unique_ptr status = this->writeResponse(&this->response); + if (status != nullptr) { + this->Finish(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + std::cout << "error: " << e.what() << std::endl; + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } +} + +template +void ServerWriteReactorBase::OnDone() { + this->doneCallback(); + delete this; +} + +template +void ServerWriteReactorBase::OnWriteDone(bool ok) { + if (!ok) { + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); + return; + } + try { + this->NextWrite(); + } catch (std::runtime_error &e) { + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } +} + +} // namespace reactor +} // namespace network +} // namespace comm