diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h index af32ef2ab..9bb380d5e 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,113 +1,121 @@ #include namespace comm { namespace network { namespace reactor { template class ClientBidiReactorBase : public grpc::ClientBidiReactor { std::shared_ptr response = nullptr; + bool terminated = false; bool done = false; bool initialized = 0; protected: Request request; grpc::Status status = grpc::Status::OK; public: grpc::ClientContext context; void nextWrite(); void terminate(const grpc::Status &status); + bool isTerminated(); bool isDone(); void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientBidiReactorBase::terminate( const grpc::Status &status) { if (this->status.ok()) { this->status = status; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - if (this->done) { + if (this->terminated) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } this->StartWritesDone(); - this->done = true; + this->terminated = true; +} + +template +bool ClientBidiReactorBase::isTerminated() { + return this->terminated; } template bool ClientBidiReactorBase::isDone() { return this->done; } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); + this->done = true; this->doneCallback(); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h index 56c661a3f..34dc8ef6c 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -1,96 +1,106 @@ #include namespace comm { namespace network { namespace reactor { template class ClientReadReactorBase : public grpc::ClientReadReactor { Response response; - grpc::Status status = grpc::Status::OK; bool done = false; + bool terminated = false; bool initialized = false; void terminate(const grpc::Status &status); +protected: + grpc::Status status = grpc::Status::OK; + public: Request request; grpc::ClientContext context; void start(); void OnReadDone(bool ok) override; void OnDone(const grpc::Status &status) override; bool isDone(); + bool isTerminated(); virtual std::unique_ptr readResponse(Response &response) = 0; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ClientReadReactorBase::terminate( const grpc::Status &status) { if (this->status.ok()) { this->status = status; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - if (this->done) { + if (this->terminated) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } - this->done = true; + this->terminated = true; } template void ClientReadReactorBase::start() { this->StartRead(&this->response); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartRead(&this->response); } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { + this->terminated = true; this->terminate(status); this->doneCallback(); } template bool ClientReadReactorBase::isDone() { return this->done; } +template +bool ClientReadReactorBase::isTerminated() { + return this->terminated; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h index ef617982c..3792ae359 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/docker-server/contents/server/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,94 +1,102 @@ #include namespace comm { namespace network { namespace reactor { template class ClientWriteReactorBase : public grpc::ClientWriteReactor { grpc::Status status = grpc::Status::OK; bool done = false; + bool terminated = false; bool initialized = 0; Request request; public: Response response; grpc::ClientContext context; void nextWrite(); void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); bool isDone(); + bool isTerminated(); void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest(Request &request) = 0; virtual void validate(){}; virtual void doneCallback(){}; virtual void terminateCallback(){}; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { if (this->status.ok()) { this->status = status; } if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - if (this->done) { + if (this->terminated) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } - this->done = true; + this->terminated = true; this->StartWritesDone(); } template bool ClientWriteReactorBase::isDone() { return this->done; } +template +bool ClientWriteReactorBase::isTerminated() { + return this->terminated; +} + template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->terminate(status); + this->done = true; this->doneCallback(); } } // namespace reactor } // namespace network } // namespace comm