diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -69,16 +69,16 @@ } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->getUtility()->state == ReactorState::DONE && - !this->putReactor->getUtility()->getStatus().ok()) { + if (this->putReactor->getStatusHolder()->state == ReactorState::DONE && + !this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->putReactor->getUtility()->getStatus().error_message()); + this->putReactor->getStatusHolder()->getStatus().error_message()); } - if (this->putReactor->getUtility()->state != ReactorState::DONE) { + if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lock2); - } else if (!this->putReactor->getUtility()->getStatus().ok()) { + } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->putReactor->getUtility()->getStatus().error_message()); + this->putReactor->getStatusHolder()->getStatus().error_message()); } try { // TODO add recovery data diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -67,9 +67,9 @@ throw std::runtime_error( "dangling data discovered after reading compaction"); } - if (!this->getReactor->getUtility()->getStatus().ok()) { + if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->getReactor->getUtility()->getStatus().error_message()); + this->getReactor->getStatusHolder()->getStatus().error_message()); } this->state = State::LOGS; } @@ -117,9 +117,9 @@ // get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getUtility()->getStatus().ok()) { + if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->getReactor->getUtility()->getStatus().error_message()); + this->getReactor->getStatusHolder()->getStatus().error_message()); } // if we get an empty chunk, we reset the currentLog so we can read the next // one from the logs collection. @@ -137,9 +137,9 @@ } void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getUtility()->getStatus().ok()) { + if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->getReactor->getUtility()->getStatus().error_message()); + this->getReactor->getStatusHolder()->getStatus().error_message()); } } diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -133,11 +133,11 @@ } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (this->putReactor->getUtility()->state != ReactorState::DONE) { + if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getUtility()->getStatus().ok()) { + } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->putReactor->getUtility()->getStatus().error_message()); + this->putReactor->getStatusHolder()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); @@ -149,8 +149,9 @@ const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " - << this->getUtility()->getStatus().error_code() << "/" - << this->getUtility()->getStatus().error_message() << std::endl; + << this->getStatusHolder()->getStatus().error_code() << "/" + << this->getStatusHolder()->getStatus().error_message() + << std::endl; } } // namespace reactor diff --git a/services/blob/src/Reactors/ReactorStatusHolder.h b/services/blob/src/Reactors/ReactorStatusHolder.h deleted file mode 100644 --- a/services/blob/src/Reactors/ReactorStatusHolder.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -enum class ReactorState { - NONE = 0, - RUNNING = 1, - TERMINATED = 2, - DONE = 3, -}; - -class ReactorStatusHolder { -private: - grpc::Status status = grpc::Status::OK; - std::mutex statusAccessMutex; - -public: - std::atomic state = ReactorState::NONE; - - grpc::Status getStatus() { - const std::unique_lock lock(this->statusAccessMutex); - return this->status; - } - void setStatus(const grpc::Status &status) { - const std::unique_lock lock(this->statusAccessMutex); - this->status = status; - } -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/BaseReactor.h b/services/lib/src/BaseReactor.h --- a/services/lib/src/BaseReactor.h +++ b/services/lib/src/BaseReactor.h @@ -12,7 +12,7 @@ class BaseReactor { public: - virtual std::shared_ptr getUtility() = 0; + virtual std::shared_ptr getStatusHolder() = 0; virtual void terminate(const grpc::Status &status) = 0; virtual void validate() = 0; virtual void doneCallback() = 0; diff --git a/services/backup/src/Reactors/ReactorStatusHolder.h b/services/lib/src/ReactorStatusHolder.h rename from services/backup/src/Reactors/ReactorStatusHolder.h rename to services/lib/src/ReactorStatusHolder.h diff --git a/services/lib/src/ReactorUtility.h b/services/lib/src/ReactorUtility.h deleted file mode 100644 --- a/services/lib/src/ReactorUtility.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -enum class ReactorState { - NONE = 0, - RUNNING = 1, - TERMINATED = 2, - DONE = 3, -}; - -class ReactorStatusHolder { -private: - grpc::Status status = grpc::Status::OK; - std::mutex statusAccessMutex; - -public: - std::atomic state = ReactorState::NONE; - - grpc::Status getStatus() { - const std::unique_lock lock(this->statusAccessMutex); - return this->status; - } - void setStatus(const grpc::Status &status) { - const std::unique_lock lock(this->statusAccessMutex); - this->status = status; - } -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h --- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h @@ -11,7 +11,7 @@ template class ClientBidiReactorBase : public grpc::ClientBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; std::shared_ptr response = nullptr; void nextWrite(); @@ -31,7 +31,7 @@ void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr prepareRequest( Request &request, @@ -57,10 +57,10 @@ template void ClientBidiReactorBase::start() { - if (this->utility->state != ReactorState::NONE) { + if (this->statusHolder->state != ReactorState::NONE) { return; } - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); this->StartCall(); } @@ -88,39 +88,39 @@ template void ClientBidiReactorBase::terminate( const grpc::Status &status) { - if (this->utility->getStatus().ok()) { - this->utility->setStatus(status); + if (this->statusHolder->getStatus().ok()) { + this->statusHolder->setStatus(status); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWritesDone(); - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr -ClientBidiReactorBase::getUtility() { - return this->utility; +ClientBidiReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h --- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientReadReactorBase.h @@ -11,7 +11,7 @@ template class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Response response; public: @@ -27,20 +27,20 @@ void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr readResponse(Response &response) = 0; }; template void ClientReadReactorBase::start() { - if (this->utility->state != ReactorState::NONE) { + if (this->statusHolder->state != ReactorState::NONE) { return; } this->StartRead(&this->response); - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { this->StartCall(); - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; } } @@ -68,38 +68,38 @@ template void ClientReadReactorBase::terminate( const grpc::Status &status) { - if (this->utility->getStatus().ok()) { - this->utility->setStatus(status); + if (this->statusHolder->getStatus().ok()) { + this->statusHolder->setStatus(status); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr -ClientReadReactorBase::getUtility() { - return this->utility; +ClientReadReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h --- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h @@ -11,7 +11,7 @@ template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Request request; void nextWrite(); @@ -29,7 +29,7 @@ void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr prepareRequest(Request &request) = 0; }; @@ -58,7 +58,7 @@ if (this->start != ReactorState::NONE) { return; } - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } @@ -74,39 +74,39 @@ template void ClientWriteReactorBase::terminate( const grpc::Status &status) { - if (this->utility->getStatus().ok()) { - this->utility->setStatus(status); + if (this->statusHolder->getStatus().ok()) { + this->statusHolder->setStatus(status); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; this->StartWritesDone(); } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr -ClientWriteReactorBase::getUtility() { - return this->utility; +ClientWriteReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h @@ -26,7 +26,7 @@ template class ServerBidiReactorBase : public grpc::ServerBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Request request; Response response; @@ -45,7 +45,7 @@ void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; void terminate(ServerBidiReactorStatus status); ServerBidiReactorStatus getStatus() const; @@ -57,7 +57,7 @@ template ServerBidiReactorBase::ServerBidiReactorBase() { - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } @@ -69,7 +69,7 @@ template void ServerBidiReactorBase::OnDone() { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 @@ -87,7 +87,7 @@ this->setStatus(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } if (this->getStatus().sendLastResponse) { @@ -96,7 +96,7 @@ } else { this->Finish(this->getStatus().status); } - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; } template @@ -148,8 +148,8 @@ template std::shared_ptr -ServerBidiReactorBase::getUtility() { - return this->utility; +ServerBidiReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -16,7 +16,7 @@ template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Request request; protected: @@ -32,7 +32,7 @@ void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr readRequest(Request request) = 0; }; @@ -41,7 +41,7 @@ ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } @@ -70,28 +70,28 @@ template void ServerReadReactorBase::terminate( const grpc::Status &status) { - this->utility->setStatus(status); + this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } - this->Finish(this->utility->getStatus()); - this->utility->state = ReactorState::TERMINATED; + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; } template void ServerReadReactorBase::OnDone() { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 @@ -100,8 +100,8 @@ template std::shared_ptr -ServerReadReactorBase::getUtility() { - return this->utility; +ServerReadReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -16,7 +16,7 @@ template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Response response; bool initialized = false; @@ -39,7 +39,7 @@ void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr writeResponse(Response *response) = 0; }; @@ -47,23 +47,23 @@ template void ServerWriteReactorBase::terminate( const grpc::Status &status) { - this->utility->setStatus(status); + this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } - this->Finish(this->utility->getStatus()); - this->utility->state = ReactorState::TERMINATED; + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; } template @@ -99,7 +99,7 @@ template void ServerWriteReactorBase::start() { - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } @@ -113,8 +113,8 @@ template std::shared_ptr -ServerWriteReactorBase::getUtility() { - return this->utility; +ServerWriteReactorBase::getStatusHolder() { + return this->statusHolder; } template