diff --git a/services/blob/src/Reactors/BaseReactor.h b/services/blob/src/Reactors/BaseReactor.h --- a/services/blob/src/Reactors/BaseReactor.h +++ b/services/blob/src/Reactors/BaseReactor.h @@ -2,6 +2,8 @@ #include +#include + namespace comm { namespace network { namespace reactor { @@ -14,12 +16,27 @@ }; class BaseReactor { +private: + grpc::Status status = grpc::Status::OK; + std::mutex statusAccessMutex; + protected: ReactorState state = ReactorState::NONE; + public: + grpc::Status getStatus() { + const std::unique_lock lock(this->statusAccessMutex); + return this->status; + } + void setStatus(const grpc::Status &status) { + const std::unique_lock lock(this->statusAccessMutex); + this->status = status; + } + ReactorState getState() const { return this->state; } + virtual void terminate(const grpc::Status &status){}; virtual void validate(){}; virtual void doneCallback(){}; diff --git a/services/blob/src/Reactors/server/PutReactor.h b/services/blob/src/Reactors/server/PutReactor.h --- a/services/blob/src/Reactors/server/PutReactor.h +++ b/services/blob/src/Reactors/server/PutReactor.h @@ -55,6 +55,7 @@ response->set_dataexists(false); return nullptr; } + std::cout << "here data chunk " << request.datachunk().size() << std::endl; if (request.datachunk().empty()) { return std::make_unique(grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected")); @@ -72,7 +73,7 @@ } void doneCallback() override { - if (!this->status.status.ok()) { + if (!this->getStatus().status.ok()) { return; } if (this->uploader == nullptr && !this->dataExists) { diff --git a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h --- a/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -24,14 +24,15 @@ }; template -class ServerBidiReactorBase - : public grpc::ServerBidiReactor, public BaseReactor { +class ServerBidiReactorBase : public grpc::ServerBidiReactor, + public BaseReactor { Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; + public: ServerBidiReactorBase(); @@ -39,7 +40,9 @@ void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; - void terminate(ServerBidiReactorStatus status); // ??? + void terminate(ServerBidiReactorStatus status); + ServerBidiReactorStatus getStatus() const; + void setStatus(const ServerBidiReactorStatus &status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; @@ -63,26 +66,38 @@ template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + this->setStatus(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } if (this->state != ReactorState::RUNNING) { return; } - if (this->status.sendLastResponse) { + if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->status.status); + &this->response, grpc::WriteOptions(), this->getStatus().status); } else { - this->Finish(this->status.status); + this->Finish(this->getStatus().status); } this->state = ReactorState::TERMINATED; } +template +ServerBidiReactorStatus +ServerBidiReactorBase::getStatus() const { + return this->status; +} + +template +void ServerBidiReactorBase::setStatus( + const ServerBidiReactorStatus &status) { + this->status = status; +} + template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { diff --git a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h --- a/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -19,8 +19,6 @@ Request request; protected: Response *response; - grpc::Status status; - public: ServerReadReactorBase(Response *response); void OnReadDone(bool ok) override; @@ -62,20 +60,20 @@ template void ServerReadReactorBase::terminate( const grpc::Status &status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; } - this->Finish(this->status); + this->Finish(this->getStatus()); this->state = ReactorState::TERMINATED; } diff --git a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h --- a/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/blob/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -22,8 +22,6 @@ protected: // this is a const ref since it's not meant to be modified const Request &request; - grpc::Status status; - public: ServerWriteReactorBase(const Request *request); @@ -39,20 +37,20 @@ template void ServerWriteReactorBase::terminate( const grpc::Status &status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; } - this->Finish(this->status); + this->Finish(this->getStatus()); this->state = ReactorState::TERMINATED; }