diff --git a/services/backup/src/Reactors/BaseReactor.h b/services/backup/src/Reactors/BaseReactor.h --- a/services/backup/src/Reactors/BaseReactor.h +++ b/services/backup/src/Reactors/BaseReactor.h @@ -2,6 +2,8 @@ #include +#include + namespace comm { namespace network { namespace reactor { @@ -14,12 +16,25 @@ }; class BaseReactor { +private: + grpc::Status status = grpc::Status::OK; + std::mutex statusAccessMutex; protected: ReactorState state = ReactorState::NONE; public: + grpc::Status getStatus() { + const std::unique_lock lock(this->statusAccessMutex); + return this->status; + } + void setStatus(const grpc::Status &status) { + const std::unique_lock lock(this->statusAccessMutex); + this->status = status; + } + ReactorState getState() const { return this->state; } + virtual void terminate(const grpc::Status &status){}; virtual void validate(){}; virtual void doneCallback(){}; diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -16,7 +16,6 @@ protected: Request request; - grpc::Status status = grpc::Status::OK; public: grpc::ClientContext context; @@ -82,11 +81,11 @@ template void ClientBidiReactorBase::terminate( const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; + if (this->getStatus().ok()) { + this->setStatus(status); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; @@ -95,7 +94,7 @@ try { this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWritesDone(); this->state = ReactorState::TERMINATED; diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -12,10 +12,6 @@ class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { Response response; - -protected: - grpc::Status status = grpc::Status::OK; - public: Request request; grpc::ClientContext context; @@ -64,11 +60,11 @@ template void ClientReadReactorBase::terminate( const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; + if (this->getStatus().ok()) { + this->setStatus(status); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; @@ -77,7 +73,7 @@ try { this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->state = ReactorState::TERMINATED; } diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -11,7 +11,6 @@ template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { - grpc::Status status = grpc::Status::OK; Request request; void nextWrite(); @@ -67,11 +66,11 @@ template void ClientWriteReactorBase::terminate( const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; + if (this->getStatus().ok()) { + this->setStatus(status); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; @@ -80,7 +79,7 @@ try { this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->state = ReactorState::TERMINATED; this->StartWritesDone(); diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h @@ -30,7 +30,6 @@ const std::string &hash, std::condition_variable *terminationNotifier); void OnDone(const grpc::Status &status); - grpc::Status getStatus() const; }; } // namespace reactor diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp @@ -14,15 +14,11 @@ } void BlobAppendHolderClientReactor::OnDone(const grpc::Status &status) { - this->status = status; + this->setStatus(status); this->state = ReactorState::DONE; this->terminationNotifier->notify_one(); } -grpc::Status BlobAppendHolderClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -28,7 +28,6 @@ std::unique_ptr readResponse(blob::GetResponse &response) override; void doneCallback() override; - grpc::Status getStatus() const; }; } // namespace reactor diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -23,10 +23,6 @@ this->dataChunks->write(""); } -grpc::Status BlobGetClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -47,7 +47,6 @@ blob::PutRequest &request, std::shared_ptr previousResponse) override; void doneCallback() override; - grpc::Status getStatus() const; bool getDataExists() const; }; diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -54,10 +54,6 @@ this->terminationNotifier->notify_one(); } -grpc::Status BlobPutClientReactor::getStatus() const { - return this->status; -} - bool BlobPutClientReactor::getDataExists() const { return this->dataExists; } diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -174,8 +174,8 @@ // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement - std::cout << "receive logs done " << this->status.error_code() << "/" - << this->status.error_message() << std::endl; + std::cout << "receive logs done " << this->getStatus().error_code() << "/" + << this->getStatus().error_message() << std::endl; } } // namespace reactor diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h --- a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -24,14 +24,15 @@ }; template -class ServerBidiReactorBase - : public grpc::ServerBidiReactor, public BaseReactor { +class ServerBidiReactorBase : public grpc::ServerBidiReactor, + public BaseReactor { Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; + public: ServerBidiReactorBase(); @@ -40,6 +41,8 @@ void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); + ServerBidiReactorStatus getStatus() const; + void setStatus(const ServerBidiReactorStatus &status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; @@ -63,26 +66,38 @@ template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + this->setStatus(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } if (this->state != ReactorState::RUNNING) { return; } - if (this->status.sendLastResponse) { + if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->status.status); + &this->response, grpc::WriteOptions(), this->getStatus().status); } else { - this->Finish(this->status.status); + this->Finish(this->getStatus().status); } this->state = ReactorState::TERMINATED; } +template +ServerBidiReactorStatus +ServerBidiReactorBase::getStatus() const { + return this->status; +} + +template +void ServerBidiReactorBase::setStatus( + const ServerBidiReactorStatus &status) { + this->status = status; +} + template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h --- a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -19,8 +19,6 @@ Request request; protected: Response *response; - grpc::Status status; - public: ServerReadReactorBase(Response *response); void OnReadDone(bool ok) override; @@ -62,20 +60,20 @@ template void ServerReadReactorBase::terminate( const grpc::Status &status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; } - this->Finish(this->status); + this->Finish(this->getStatus()); this->state = ReactorState::TERMINATED; } diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h --- a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -24,8 +24,6 @@ protected: // this is a const ref since it's not meant to be modified const Request &request; - grpc::Status status; - public: ServerWriteReactorBase(const Request *request); @@ -42,20 +40,20 @@ template void ServerWriteReactorBase::terminate( const grpc::Status &status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->setStatus(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->getStatus().ok()) { + std::cout << "error: " << this->getStatus().error_message() << std::endl; } if (this->state != ReactorState::RUNNING) { return; } - this->Finish(this->status); + this->Finish(this->getStatus()); this->state = ReactorState::TERMINATED; }