diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h index 178ff90a7..308cba23b 100644 --- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h @@ -1,127 +1,129 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - write N requests to the server // - terminate the connection template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; + bool initialized = false; void nextWrite(); public: Response response; grpc::ClientContext context; // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; // - argument request - request that should be edited and is going to be sent // in the current cycle to the server // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr prepareRequest(Request &request) = 0; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::start() { - if (this->start != ReactorState::NONE) { + throw std::runtime_error("this class has not been tested"); + if (this->statusHolder->state != ReactorState::NONE) { return; } this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { if (this->statusHolder->getStatus().ok()) { this->statusHolder->setStatus(status); } if (!this->statusHolder->getStatus().ok()) { std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->statusHolder->state = ReactorState::TERMINATED; this->StartWritesDone(); } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr ClientWriteReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm