diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index 0af2f8ad7..dadfed526 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,112 +1,118 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor { Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; virtual void initialize(){}; + virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->initialize(); this->StartRead(&this->request); } template void ServerBidiReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { - this->terminateCallback(); this->status = status; - if (!this->status.status.ok()) { - std::cout << "error: " << this->status.status.error_message() << std::endl; + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->status = ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (this->status.sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->status.status); } else { this->Finish(this->status.status); } } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::ABORTED, "no more reads"))); + // Ending a connection on the other side results in the `ok` flag being set + // to false. It makes it impossible to detect a failure based just on the + // flag. We should manually check if the data we received is valid + this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h index c1992ac8d..eeabe5eb5 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,80 +1,89 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor { Request request; void terminate(grpc::Status status); protected: Response *response; grpc::Status status; public: ServerReadReactorBase(Response *response); void OnDone() override; void OnReadDone(bool ok) override; virtual std::unique_ptr readRequest(Request request) = 0; virtual void initialize(){}; + virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ServerReadReactorBase::terminate(grpc::Status status) { + this->status = status; this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - this->status = status; this->Finish(status); } template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->initialize(); this->StartRead(&this->request); } template void ServerReadReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "reading error")); + // Ending a connection on the other side results in the `ok` flag being set + // to false. It makes it impossible to detect a failure based just on the + // flag. We should manually check if the data we received is valid + this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index 0cca2a6d9..dadfed526 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,112 +1,118 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { - grpc::Status status = grpc::Status::OK; - bool sendLastResponse = false; + grpc::Status status; + bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor { Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; virtual void initialize(){}; + virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->initialize(); this->StartRead(&this->request); } template void ServerBidiReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { - this->terminateCallback(); this->status = status; - if (!this->status.status.ok()) { - std::cout << "error: " << this->status.status.error_message() << std::endl; + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->status = ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (this->status.sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->status.status); } else { this->Finish(this->status.status); } } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; - this->terminate(ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::ABORTED, "no more reads"))); + // Ending a connection on the other side results in the `ok` flag being set + // to false. It makes it impossible to detect a failure based just on the + // flag. We should manually check if the data we received is valid + this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h index c1992ac8d..eeabe5eb5 100644 --- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,80 +1,89 @@ #pragma once #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor { Request request; void terminate(grpc::Status status); protected: Response *response; grpc::Status status; public: ServerReadReactorBase(Response *response); void OnDone() override; void OnReadDone(bool ok) override; virtual std::unique_ptr readRequest(Request request) = 0; virtual void initialize(){}; + virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ServerReadReactorBase::terminate(grpc::Status status) { + this->status = status; this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - this->status = status; this->Finish(status); } template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->initialize(); this->StartRead(&this->request); } template void ServerReadReactorBase::OnDone() { this->doneCallback(); delete this; } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { - this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "reading error")); + // Ending a connection on the other side results in the `ok` flag being set + // to false. It makes it impossible to detect a failure based just on the + // flag. We should manually check if the data we received is valid + this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } } // namespace reactor } // namespace network } // namespace comm