diff --git a/services/backup/docker-server/contents/server/CMakeLists.txt b/services/backup/docker-server/contents/server/CMakeLists.txt --- a/services/backup/docker-server/contents/server/CMakeLists.txt +++ b/services/backup/docker-server/contents/server/CMakeLists.txt @@ -51,6 +51,11 @@ ./src ./src/DatabaseEntities ./src/Reactors + ./src/Reactors/server + ./src/Reactors/server/base-reactors + ./src/Reactors/client + ./src/Reactors/client/blob + ./src/Reactors/client/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion diff --git a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp --- a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp +++ b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp @@ -1,7 +1,7 @@ #include "BackupServiceImpl.h" -#include "BidiReactorBase.h" #include "ReadReactorBase.h" +#include "ServerBidiReactorBase.h" #include <aws/core/Aws.h> @@ -20,16 +20,16 @@ backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { - class CreateNewBackupReactor : public BidiReactorBase< + class CreateNewBackupReactor : public reactor::ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { public: - std::unique_ptr<grpc::Status> handleRequest( + std::unique_ptr<reactor::ServerBidiReactorStatus> handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override { // TODO handle request - return std::make_unique<grpc::Status>( - grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); + return std::make_unique<reactor::ServerBidiReactorStatus>( + grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } }; @@ -60,16 +60,16 @@ backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { - class RecoverBackupKeyReactor : public BidiReactorBase< + class RecoverBackupKeyReactor : public reactor::ServerBidiReactorBase< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> { public: - std::unique_ptr<grpc::Status> handleRequest( + std::unique_ptr<reactor::ServerBidiReactorStatus> handleRequest( backup::RecoverBackupKeyRequest request, backup::RecoverBackupKeyResponse *response) override { // TODO handle request - return std::make_unique<grpc::Status>( - grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); + return std::make_unique<reactor::ServerBidiReactorStatus>( + grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } }; @@ -78,16 +78,16 @@ grpc::ServerBidiReactor<backup::PullBackupRequest, backup::PullBackupResponse> * BackupServiceImpl::PullBackup(grpc::CallbackServerContext *context) { - class PullBackupReactor : public BidiReactorBase< + class PullBackupReactor : public reactor::ServerBidiReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { public: - std::unique_ptr<grpc::Status> handleRequest( + std::unique_ptr<reactor::ServerBidiReactorStatus> handleRequest( backup::PullBackupRequest request, backup::PullBackupResponse *response) override { // TODO handle request - return std::make_unique<grpc::Status>( - grpc::StatusCode::UNIMPLEMENTED, "unimplemented"); + return std::make_unique<reactor::ServerBidiReactorStatus>( + grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "unimplemented")); } }; diff --git a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h deleted file mode 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include <grpcpp/grpcpp.h> -#include <iostream> -#include <memory> -#include <string> - -namespace comm { -namespace network { - -template <class Request, class Response> -class BidiReactorBase : public grpc::ServerBidiReactor<Request, Response> { - Request request; - Response response; - -public: - BidiReactorBase(); - - void OnDone() override; - void OnReadDone(bool ok) override; - void OnWriteDone(bool ok) override; - - virtual std::unique_ptr<grpc::Status> - handleRequest(Request request, Response *response) = 0; -}; - -template <class Request, class Response> -BidiReactorBase<Request, Response>::BidiReactorBase() { - this->StartRead(&this->request); -} - -template <class Request, class Response> -void BidiReactorBase<Request, Response>::OnDone() { - delete this; -} - -template <class Request, class Response> -void BidiReactorBase<Request, Response>::OnReadDone(bool ok) { - if (!ok) { - this->Finish( - grpc::Status(grpc::StatusCode::INTERNAL, "OnReadDone: reading error")); - return; - } - this->response = Response(); - try { - std::unique_ptr<grpc::Status> status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->Finish(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::runtime_error &e) { - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } -} - -template <class Request, class Response> -void BidiReactorBase<Request, Response>::OnWriteDone(bool ok) { - if (!ok) { - std::cout << "Server write failed" << std::endl; - return; - } - this->StartRead(&this->request); -} - -} // namespace network -} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -0,0 +1,107 @@ +#pragma once + +#include <grpcpp/grpcpp.h> + +#include <iostream> +#include <memory> +#include <string> + +namespace comm { +namespace network { +namespace reactor { + +struct ServerBidiReactorStatus { + grpc::Status status; + bool sendLastResponse; + ServerBidiReactorStatus( + grpc::Status status = grpc::Status::OK, + bool sendLastResponse = false) + : status(status), sendLastResponse(sendLastResponse) { + } +}; + +template <class Request, class Response> +class ServerBidiReactorBase + : public grpc::ServerBidiReactor<Request, Response> { + Request request; + Response response; + +protected: + ServerBidiReactorStatus status; + bool readingAborted = false; + +public: + ServerBidiReactorBase(); + + void OnDone() override; + void OnReadDone(bool ok) override; + void OnWriteDone(bool ok) override; + + void terminate(ServerBidiReactorStatus status); + + virtual std::unique_ptr<ServerBidiReactorStatus> + handleRequest(Request request, Response *response) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; +}; + +template <class Request, class Response> +ServerBidiReactorBase<Request, Response>::ServerBidiReactorBase() { + this->initialize(); + this->StartRead(&this->request); +} + +template <class Request, class Response> +void ServerBidiReactorBase<Request, Response>::OnDone() { + this->doneCallback(); + delete this; +} + +template <class Request, class Response> +void ServerBidiReactorBase<Request, Response>::terminate( + ServerBidiReactorStatus status) { + this->status = status; + if (this->status.sendLastResponse) { + this->StartWriteAndFinish( + &this->response, grpc::WriteOptions(), this->status.status); + } else { + this->Finish(this->status.status); + } +} + +template <class Request, class Response> +void ServerBidiReactorBase<Request, Response>::OnReadDone(bool ok) { + if (!ok) { + this->readingAborted = true; + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::ABORTED, "no more reads"))); + return; + } + try { + this->response = Response(); + std::unique_ptr<ServerBidiReactorStatus> status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); + } +} + +template <class Request, class Response> +void ServerBidiReactorBase<Request, Response>::OnWriteDone(bool ok) { + if (!ok) { + this->terminate(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); + return; + } + this->StartRead(&this->request); +} + +} // namespace reactor +} // namespace network +} // namespace comm