diff --git a/services/backup/docker-server/contents/server/CMakeLists.txt b/services/backup/docker-server/contents/server/CMakeLists.txt --- a/services/backup/docker-server/contents/server/CMakeLists.txt +++ b/services/backup/docker-server/contents/server/CMakeLists.txt @@ -50,6 +50,7 @@ include_directories( ./src ./src/DatabaseEntities + ./src/Reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion diff --git a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include + +namespace comm { +namespace network { + +template +class BidiReactorBase : public grpc::ServerBidiReactor { + Request request; + Response response; + +public: + BidiReactorBase(); + + void OnDone() override; + void OnReadDone(bool ok) override; + void OnWriteDone(bool ok) override; + + virtual std::unique_ptr + handleRequest(Request request, Response *response) = 0; +}; + +template +BidiReactorBase::BidiReactorBase() { + this->StartRead(&this->request); +} + +template +void BidiReactorBase::OnDone() { + delete this; +} + +template +void BidiReactorBase::OnReadDone(bool ok) { + if (!ok) { + this->Finish( + grpc::Status(grpc::StatusCode::INTERNAL, "OnReadDone: reading error")); + return; + } + this->response = Response(); + try { + std::unique_ptr status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->Finish(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } +} + +template +void BidiReactorBase::OnWriteDone(bool ok) { + if (!ok) { + std::cout << "Server write failed" << std::endl; + return; + } + this->StartRead(&this->request); +} + +} // namespace network +} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/ReadReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/ReadReactorBase.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/ReadReactorBase.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include +#include + +namespace comm { +namespace network { + +template +class ReadReactorBase : public grpc::ServerReadReactor { + Request request; + +protected: + Response *response; + +public: + ReadReactorBase(Response *response); + + void OnDone() override; + void OnReadDone(bool ok) override; + + virtual std::unique_ptr readRequest(Request request) = 0; +}; + +template +ReadReactorBase::ReadReactorBase(Response *response) + : response(response) { + this->StartRead(&this->request); +} + +template +void ReadReactorBase::OnDone() { + delete this; +} + +template +void ReadReactorBase::OnReadDone(bool ok) { + if (!ok) { + this->Finish( + grpc::Status(grpc::StatusCode::INTERNAL, "OnReadDone: reading error")); + return; + } + try { + std::unique_ptr status = this->readRequest(this->request); + if (status != nullptr) { + this->Finish(*status); + return; + } + } catch (std::runtime_error &e) { + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + return; + } + this->StartRead(&this->request); +} + +} // namespace network +} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/WriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/WriteReactorBase.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/WriteReactorBase.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include +#include + +namespace comm { +namespace network { + +template +class WriteReactorBase : public grpc::ServerWriteReactor { + Response response; + +protected: + // this is a const ref since it's not meant to be modified + const Request &request; + +public: + WriteReactorBase(const Request *request); + + virtual void NextWrite(); + void OnDone() override; + void OnWriteDone(bool ok) override; + + virtual std::unique_ptr writeResponse(Response *response) = 0; +}; + +template +WriteReactorBase::WriteReactorBase(const Request *request) + : request(*request) { + // we cannot call this->NextWrite() here because it's going to call it on + // the base class, not derived leading to the runtime error of calling + // a pure virtual function + // NextWrite has to be exposed as a public function and called explicitly + // to initialize writing +} + +template +void WriteReactorBase::NextWrite() { + this->response = Response(); + std::unique_ptr status = this->writeResponse(&this->response); + if (status != nullptr) { + this->Finish(*status); + return; + } + this->StartWrite(&this->response); +} + +template +void WriteReactorBase::OnDone() { + delete this; +} + +template +void WriteReactorBase::OnWriteDone(bool ok) { + if (!ok) { + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); + return; + } + try { + this->NextWrite(); + } catch (std::runtime_error &e) { + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } +} + +} // namespace network +} // namespace comm