diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h index fb4ddf86e..a34c8ac5b 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,96 +1,97 @@ #include namespace comm { namespace network { namespace reactor { template class ClientBidiReactorBase : public grpc::ClientBidiReactor { std::shared_ptr response = nullptr; bool done = false; bool initialized = 0; protected: Request request; grpc::Status status; public: grpc::ClientContext context; void nextWrite(); void terminate(const grpc::Status &status); bool isDone(); void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; - virtual void doneCallback() { - } + virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientBidiReactorBase::terminate( const grpc::Status &status) { + this->terminateCallback(); if (this->done) { return; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } this->StartWritesDone(); this->status = status; this->done = true; } template bool ClientBidiReactorBase::isDone() { return this->done; } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "read error")); return; } this->nextWrite(); } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); this->doneCallback(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h index 48d7205a5..1da73e9d3 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -1,81 +1,82 @@ #include namespace comm { namespace network { namespace reactor { template class ClientReadReactorBase : public grpc::ClientReadReactor { Response response; grpc::Status status; bool done = false; bool initialized = false; void terminate(const grpc::Status &status); public: Request request; grpc::ClientContext context; void start(); void OnReadDone(bool ok) override; void OnDone(const grpc::Status &status) override; bool isDone(); virtual std::unique_ptr readResponse(const Response &response) = 0; - virtual void doneCallback() { - } + virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template void ClientReadReactorBase::terminate( const grpc::Status &status) { + this->terminateCallback(); if (this->done) { return; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } this->status = status; this->done = true; } template void ClientReadReactorBase::start() { this->StartRead(&this->response); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "read error")); return; } std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartRead(&this->response); } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); this->doneCallback(); } template bool ClientReadReactorBase::isDone() { return this->done; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h index 276044bf2..d2c0e97ae 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,81 +1,82 @@ #include namespace comm { namespace network { namespace reactor { template class ClientWriteReactorBase : public grpc::ClientWriteReactor { grpc::Status status; bool done = false; bool initialized = 0; Request request; public: Response response; grpc::ClientContext context; void nextWrite(); void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); bool isDone(); void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest(Request &request) = 0; - virtual void doneCallback() { - } + virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { + this->terminateCallback(); if (this->done) { return; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } this->status = status; this->done = true; this->StartWritesDone(); } template bool ClientWriteReactorBase::isDone() { return this->done; } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); this->doneCallback(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index c805f99e6..0af2f8ad7 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,110 +1,112 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor { Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; virtual void initialize(){}; virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->initialize(); this->StartRead(&this->request); } template void ServerBidiReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { + this->terminateCallback(); this->status = status; if (!this->status.status.ok()) { std::cout << "error: " << this->status.status.error_message() << std::endl; } if (this->status.sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->status.status); } else { this->Finish(this->status.status); } } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "no more reads"))); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h index ea5296631..c1992ac8d 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,78 +1,80 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor { Request request; void terminate(grpc::Status status); protected: Response *response; grpc::Status status; public: ServerReadReactorBase(Response *response); void OnDone() override; void OnReadDone(bool ok) override; virtual std::unique_ptr readRequest(Request request) = 0; virtual void initialize(){}; virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template void ServerReadReactorBase::terminate(grpc::Status status) { + this->terminateCallback(); if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } this->status = status; this->Finish(status); } template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->initialize(); this->StartRead(&this->request); } template void ServerReadReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "reading error")); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index 93275868f..3b7cd472a 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,94 +1,96 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor { Response response; bool initialized = false; void terminate(grpc::Status status); protected: // this is a const ref since it's not meant to be modified const Request &request; grpc::Status status; public: ServerWriteReactorBase(const Request *request); virtual void NextWrite(); void OnDone() override; void OnWriteDone(bool ok) override; virtual std::unique_ptr writeResponse(Response *response) = 0; virtual void initialize(){}; virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template void ServerWriteReactorBase::terminate(grpc::Status status) { + this->terminateCallback(); if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } this->status = status; this->Finish(status); } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->NextWrite() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // NextWrite has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::NextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->NextWrite(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index 274bc2f49..0cca2a6d9 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,110 +1,112 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status = grpc::Status::OK; bool sendLastResponse = false; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor { Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; virtual void initialize(){}; virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->initialize(); this->StartRead(&this->request); } template void ServerBidiReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { + this->terminateCallback(); this->status = status; if (!this->status.status.ok()) { std::cout << "error: " << this->status.status.error_message() << std::endl; } if (this->status.sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->status.status); } else { this->Finish(this->status.status); } } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "no more reads"))); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h index ea5296631..c1992ac8d 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,78 +1,80 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor { Request request; void terminate(grpc::Status status); protected: Response *response; grpc::Status status; public: ServerReadReactorBase(Response *response); void OnDone() override; void OnReadDone(bool ok) override; virtual std::unique_ptr readRequest(Request request) = 0; virtual void initialize(){}; virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; template void ServerReadReactorBase::terminate(grpc::Status status) { + this->terminateCallback(); if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } this->status = status; this->Finish(status); } template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->initialize(); this->StartRead(&this->request); } template void ServerReadReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "reading error")); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index ee82dbcf1..3b7cd472a 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,87 +1,96 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor { Response response; bool initialized = false; + void terminate(grpc::Status status); + protected: // this is a const ref since it's not meant to be modified const Request &request; grpc::Status status; public: ServerWriteReactorBase(const Request *request); virtual void NextWrite(); void OnDone() override; void OnWriteDone(bool ok) override; virtual std::unique_ptr writeResponse(Response *response) = 0; virtual void initialize(){}; virtual void doneCallback(){}; + virtual void terminateCallback(){}; }; +template +void ServerWriteReactorBase::terminate(grpc::Status status) { + this->terminateCallback(); + if (!this->status.ok()) { + std::cout << "error: " << this->status.error_message() << std::endl; + } + this->status = status; + this->Finish(status); +} + template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->NextWrite() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // NextWrite has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::NextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { - this->Finish(*status); + this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); + this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } - try { - this->NextWrite(); - } catch (std::runtime_error &e) { - this->Finish(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); - } + this->NextWrite(); } } // namespace reactor } // namespace network } // namespace comm