diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { @@ -7,36 +9,33 @@ namespace reactor { template -class ClientBidiReactorBase - : public grpc::ClientBidiReactor { +class ClientBidiReactorBase : public grpc::ClientBidiReactor, + public BaseReactor { + std::shared_ptr utility; std::shared_ptr response = nullptr; - bool terminated = false; - bool done = false; - bool initialized = 0; - void nextWrite(); protected: Request request; - grpc::Status status = grpc::Status::OK; public: grpc::ClientContext context; void start(); - void terminate(const grpc::Status &status); - bool isTerminated(); - bool isDone(); + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; + std::shared_ptr getUtility() override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template @@ -54,47 +53,16 @@ return; } this->StartWrite(&this->request); - if (!this->initialized) { - this->StartCall(); - this->initialized = true; - } } template void ClientBidiReactorBase::start() { - this->nextWrite(); -} - -template -void ClientBidiReactorBase::terminate( - const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->terminated) { + if (this->utility->state != ReactorState::NONE) { return; } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - this->StartWritesDone(); - this->terminated = true; -} - -template -bool ClientBidiReactorBase::isTerminated() { - return this->terminated; -} - -template -bool ClientBidiReactorBase::isDone() { - return this->done; + this->utility->state = ReactorState::RUNNING; + this->nextWrite(); + this->StartCall(); } template @@ -117,14 +85,44 @@ this->nextWrite(); } +template +void ClientBidiReactorBase::terminate( + const grpc::Status &status) { + if (this->utility->getStatus().ok()) { + this->utility->setStatus(status); + } + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; + } + if (this->utility->state != ReactorState::RUNNING) { + return; + } + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } + this->StartWritesDone(); + this->utility->state = ReactorState::TERMINATED; +} + template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { + this->utility->state = ReactorState::DONE; this->terminate(status); - this->done = true; this->doneCallback(); } +template +std::shared_ptr +ClientBidiReactorBase::getUtility() { + return this->utility; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { @@ -7,60 +9,38 @@ namespace reactor { template -class ClientReadReactorBase : public grpc::ClientReadReactor { +class ClientReadReactorBase : public grpc::ClientReadReactor, + public BaseReactor { + std::shared_ptr utility; Response response; - bool done = false; - bool terminated = false; - bool initialized = false; - - void terminate(const grpc::Status &status); - -protected: - grpc::Status status = grpc::Status::OK; public: Request request; grpc::ClientContext context; void start(); + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - bool isDone(); - bool isTerminated(); + std::shared_ptr getUtility() override; virtual std::unique_ptr readResponse(Response &response) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template -void ClientReadReactorBase::terminate( - const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->terminated) { +void ClientReadReactorBase::start() { + if (this->utility->state != ReactorState::NONE) { return; } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - this->terminated = true; -} - -template -void ClientReadReactorBase::start() { this->StartRead(&this->response); - if (!this->initialized) { + if (this->utility->state != ReactorState::RUNNING) { this->StartCall(); - this->initialized = true; + this->utility->state = ReactorState::RUNNING; } } @@ -86,21 +66,40 @@ } template -void ClientReadReactorBase::OnDone( +void ClientReadReactorBase::terminate( const grpc::Status &status) { - this->terminated = true; - this->terminate(status); - this->doneCallback(); + if (this->utility->getStatus().ok()) { + this->utility->setStatus(status); + } + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; + } + if (this->utility->state != ReactorState::RUNNING) { + return; + } + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } + this->utility->state = ReactorState::TERMINATED; } template -bool ClientReadReactorBase::isDone() { - return this->done; +void ClientReadReactorBase::OnDone( + const grpc::Status &status) { + this->utility->state = ReactorState::DONE; + this->terminate(status); + this->doneCallback(); } template -bool ClientReadReactorBase::isTerminated() { - return this->terminated; +std::shared_ptr +ClientReadReactorBase::getUtility() { + return this->utility; } } // namespace reactor diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { @@ -7,11 +9,9 @@ namespace reactor { template -class ClientWriteReactorBase : public grpc::ClientWriteReactor { - grpc::Status status = grpc::Status::OK; - bool done = false; - bool terminated = false; - bool initialized = 0; +class ClientWriteReactorBase : public grpc::ClientWriteReactor, + public BaseReactor { + std::shared_ptr utility; Request request; void nextWrite(); @@ -20,16 +20,18 @@ Response response; grpc::ClientContext context; + void start(); + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnWriteDone(bool ok) override; - void terminate(const grpc::Status &status); - bool isDone(); - bool isTerminated(); + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; + std::shared_ptr getUtility() override; virtual std::unique_ptr prepareRequest(Request &request) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template @@ -53,6 +55,10 @@ template void ClientWriteReactorBase::start() { + if (this->start != ReactorState::NONE) { + return; + } + this->utility->state = ReactorState::RUNNING; this->nextWrite(); } @@ -68,43 +74,41 @@ template void ClientWriteReactorBase::terminate( const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; + if (this->utility->getStatus().ok()) { + this->utility->setStatus(status); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; } - if (this->terminated) { + if (this->utility->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->terminated = true; + this->utility->state = ReactorState::TERMINATED; this->StartWritesDone(); } -template -bool ClientWriteReactorBase::isDone() { - return this->done; -} - -template -bool ClientWriteReactorBase::isTerminated() { - return this->terminated; -} - template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { + this->utility->state = ReactorState::DONE; this->terminate(status); - this->done = true; this->doneCallback(); } +template +std::shared_ptr +ClientWriteReactorBase::getUtility() { + return this->utility; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -28,7 +28,6 @@ std::unique_ptr readResponse(blob::GetResponse &response) override; void doneCallback() override; - grpc::Status getStatus() const; }; } // namespace reactor diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -23,10 +23,6 @@ this->dataChunks->write(""); } -grpc::Status BlobGetClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -46,7 +46,6 @@ blob::PutRequest &request, std::shared_ptr previousResponse) override; void doneCallback() override; - grpc::Status getStatus() const; }; } // namespace reactor diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -53,10 +53,6 @@ this->terminationNotifier->notify_one(); } -grpc::Status BlobPutClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -68,10 +68,16 @@ } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { + if (this->putReactor->getUtility()->state == ReactorState::DONE && + !this->putReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->putReactor->getUtility()->getStatus().error_message()); + } + if (this->putReactor->getUtility()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lock2); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); + } else if (!this->putReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->putReactor->getUtility()->getStatus().error_message()); } try { // TODO add recovery data diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -67,8 +67,9 @@ throw std::runtime_error( "dangling data discovered after reading compaction"); } - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (!this->getReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->getReactor->getUtility()->getStatus().error_message()); } this->state = State::LOGS; } @@ -116,8 +117,9 @@ // get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (!this->getReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->getReactor->getUtility()->getStatus().error_message()); } // if we get an empty chunk, we reset the currentLog so we can read the next // one from the logs collection. @@ -135,8 +137,9 @@ } void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (!this->getReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->getReactor->getUtility()->getStatus().error_message()); } } diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -133,10 +133,11 @@ } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { + if (this->putReactor->getUtility()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); + } else if (!this->putReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->putReactor->getUtility()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase();