diff --git a/services/backup/docker-server/contents/server/CMakeLists.txt b/services/backup/docker-server/contents/server/CMakeLists.txt --- a/services/backup/docker-server/contents/server/CMakeLists.txt +++ b/services/backup/docker-server/contents/server/CMakeLists.txt @@ -50,7 +50,11 @@ include_directories( ./src ./src/DatabaseEntities - ./src/Reactors + ./src/Reactors/server + ./src/Reactors/server/base-reactors + ./src/Reactors/client + ./src/Reactors/client/blob + ./src/Reactors/client/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion diff --git a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp --- a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp +++ b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp @@ -1,7 +1,7 @@ #include "BackupServiceImpl.h" -#include "BidiReactorBase.h" -#include "ReadReactorBase.h" +#include "ServerBidiReactorBase.h" +#include "ServerReadReactorBase.h" #include @@ -20,7 +20,7 @@ backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { - class CreateNewBackupReactor : public BidiReactorBase< + class CreateNewBackupReactor : public reactor::ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { public: @@ -39,12 +39,13 @@ grpc::ServerReadReactor *BackupServiceImpl::SendLog( grpc::CallbackServerContext *context, google::protobuf::Empty *response) { - class SendLogReactor : public ReadReactorBase< + class SendLogReactor : public reactor::ServerReadReactorBase< backup::SendLogRequest, google::protobuf::Empty> { public: - using ReadReactorBase:: - ReadReactorBase; + using ServerReadReactorBase< + backup::SendLogRequest, + google::protobuf::Empty>::ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override { // TODO handle request @@ -60,7 +61,7 @@ backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { - class RecoverBackupKeyReactor : public BidiReactorBase< + class RecoverBackupKeyReactor : public reactor::ServerBidiReactorBase< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> { public: @@ -78,7 +79,7 @@ grpc::ServerBidiReactor * BackupServiceImpl::PullBackup(grpc::CallbackServerContext *context) { - class PullBackupReactor : public BidiReactorBase< + class PullBackupReactor : public reactor::ServerBidiReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { public: diff --git a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h deleted file mode 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/BidiReactorBase.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace comm { -namespace network { - -template -class BidiReactorBase : public grpc::ServerBidiReactor { - Request request; - Response response; - -public: - BidiReactorBase(); - - void OnDone() override; - void OnReadDone(bool ok) override; - void OnWriteDone(bool ok) override; - - virtual std::unique_ptr - handleRequest(Request request, Response *response) = 0; -}; - -template -BidiReactorBase::BidiReactorBase() { - this->StartRead(&this->request); -} - -template -void BidiReactorBase::OnDone() { - delete this; -} - -template -void BidiReactorBase::OnReadDone(bool ok) { - if (!ok) { - this->Finish( - grpc::Status(grpc::StatusCode::INTERNAL, "OnReadDone: reading error")); - return; - } - this->response = Response(); - try { - std::unique_ptr status = - this->handleRequest(this->request, &this->response); - if (status != nullptr) { - this->Finish(*status); - return; - } - this->StartWrite(&this->response); - } catch (std::runtime_error &e) { - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } -} - -template -void BidiReactorBase::OnWriteDone(bool ok) { - if (!ok) { - std::cout << "Server write failed" << std::endl; - return; - } - this->StartRead(&this->request); -} - -} // namespace network -} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h new file mode 100644 --- /dev/null +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -0,0 +1,93 @@ +#pragma once + +#include + +#include +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +template +class ServerBidiReactorBase + : public grpc::ServerBidiReactor { + Request request; + Response response; + +protected: + grpc::Status status; + bool readingAborted = false; + bool sendLastResponse = false; + +public: + ServerBidiReactorBase(); + + void OnDone() override; + void OnReadDone(bool ok) override; + void OnWriteDone(bool ok) override; + + void terminate(grpc::Status status); + + virtual std::unique_ptr + handleRequest(Request request, Response *response) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; +}; + +template +ServerBidiReactorBase::ServerBidiReactorBase() { + this->initialize(); + this->StartRead(&this->request); +} + +template +void ServerBidiReactorBase::OnDone() { + this->doneCallback(); + delete this; +} + +template +void ServerBidiReactorBase::terminate(grpc::Status status) { + this->status = status; + if (this->sendLastResponse) { + this->StartWriteAndFinish(&this->response, grpc::WriteOptions(), status); + } else { + this->Finish(status); + } +} + +template +void ServerBidiReactorBase::OnReadDone(bool ok) { + if (!ok) { + this->readingAborted = true; + this->terminate(grpc::Status(grpc::StatusCode::ABORTED, "no more reads")); + return; + } + try { + this->response = Response(); + std::unique_ptr status = + this->handleRequest(this->request, &this->response); + if (status != nullptr) { + this->terminate(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } +} + +template +void ServerBidiReactorBase::OnWriteDone(bool ok) { + if (!ok) { + std::cout << "Server write failed" << std::endl; + return; + } + this->StartRead(&this->request); +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/ReadReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h rename from services/backup/docker-server/contents/server/src/Reactors/ReadReactorBase.h rename to services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h --- a/services/backup/docker-server/contents/server/src/Reactors/ReadReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,59 +1,75 @@ #pragma once #include + #include #include #include namespace comm { namespace network { +namespace reactor { template -class ReadReactorBase : public grpc::ServerReadReactor { +class ServerReadReactorBase : public grpc::ServerReadReactor { Request request; + void terminate(grpc::Status status); + protected: Response *response; + grpc::Status status; public: - ReadReactorBase(Response *response); + ServerReadReactorBase(Response *response); void OnDone() override; void OnReadDone(bool ok) override; virtual std::unique_ptr readRequest(Request request) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; }; template -ReadReactorBase::ReadReactorBase(Response *response) +void ServerReadReactorBase::terminate(grpc::Status status) { + this->status = status; + this->Finish(status); +} + +template +ServerReadReactorBase::ServerReadReactorBase( + Response *response) : response(response) { + this->initialize(); this->StartRead(&this->request); } template -void ReadReactorBase::OnDone() { +void ServerReadReactorBase::OnDone() { + this->doneCallback(); delete this; } template -void ReadReactorBase::OnReadDone(bool ok) { +void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { - this->Finish( - grpc::Status(grpc::StatusCode::INTERNAL, "OnReadDone: reading error")); + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "reading error")); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { - this->Finish(*status); + this->terminate(*status); return; } } catch (std::runtime_error &e) { - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } +} // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/WriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h rename from services/backup/docker-server/contents/server/src/Reactors/WriteReactorBase.h rename to services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h --- a/services/backup/docker-server/contents/server/src/Reactors/WriteReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,33 +1,39 @@ #pragma once #include + #include #include #include namespace comm { namespace network { +namespace reactor { template -class WriteReactorBase : public grpc::ServerWriteReactor { +class ServerWriteReactorBase : public grpc::ServerWriteReactor { Response response; + bool initialized = false; protected: // this is a const ref since it's not meant to be modified const Request &request; public: - WriteReactorBase(const Request *request); + ServerWriteReactorBase(const Request *request); virtual void NextWrite(); void OnDone() override; void OnWriteDone(bool ok) override; virtual std::unique_ptr writeResponse(Response *response) = 0; + virtual void initialize(){}; + virtual void doneCallback(){}; }; template -WriteReactorBase::WriteReactorBase(const Request *request) +ServerWriteReactorBase::ServerWriteReactorBase( + const Request *request) : request(*request) { // we cannot call this->NextWrite() here because it's going to call it on // the base class, not derived leading to the runtime error of calling @@ -37,23 +43,33 @@ } template -void WriteReactorBase::NextWrite() { - this->response = Response(); - std::unique_ptr status = this->writeResponse(&this->response); - if (status != nullptr) { - this->Finish(*status); - return; +void ServerWriteReactorBase::NextWrite() { + try { + if (!this->initialized) { + this->initialize(); + this->initialized = true; + } + this->response = Response(); + std::unique_ptr status = this->writeResponse(&this->response); + if (status != nullptr) { + this->Finish(*status); + return; + } + this->StartWrite(&this->response); + } catch (std::runtime_error &e) { + std::cout << "error: " << e.what() << std::endl; + this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->StartWrite(&this->response); } template -void WriteReactorBase::OnDone() { +void ServerWriteReactorBase::OnDone() { + this->doneCallback(); delete this; } template -void WriteReactorBase::OnWriteDone(bool ok) { +void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; @@ -65,5 +81,6 @@ } } +} // namespace reactor } // namespace network } // namespace comm