diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { @@ -8,12 +10,8 @@ template class ClientBidiReactorBase - : public grpc::ClientBidiReactor { + : public grpc::ClientBidiReactor, public BaseReactor { std::shared_ptr response = nullptr; - bool terminated = false; - bool done = false; - bool initialized = 0; - void nextWrite(); protected: @@ -24,19 +22,14 @@ grpc::ClientContext context; void start(); - void terminate(const grpc::Status &status); - bool isTerminated(); - bool isDone(); void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template @@ -54,47 +47,16 @@ return; } this->StartWrite(&this->request); - if (!this->initialized) { - this->StartCall(); - this->initialized = true; - } } template void ClientBidiReactorBase::start() { - this->nextWrite(); -} - -template -void ClientBidiReactorBase::terminate( - const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->terminated) { + if (this->state != ReactorState::NONE) { return; } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - this->StartWritesDone(); - this->terminated = true; -} - -template -bool ClientBidiReactorBase::isTerminated() { - return this->terminated; -} - -template -bool ClientBidiReactorBase::isDone() { - return this->done; + this->state = ReactorState::RUNNING; + this->nextWrite(); + this->StartCall(); } template @@ -117,11 +79,34 @@ this->nextWrite(); } +template +void ClientBidiReactorBase::terminate( + const grpc::Status &status) { + this->state = ReactorState::TERMINATED; + if (this->status.ok()) { + this->status = status; + } + if (!this->status.ok()) { + std::cout << "error: " << this->status.error_message() << std::endl; + } + if (this->state != ReactorState::RUNNING) { + return; + } + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + this->StartWritesDone(); + this->state = ReactorState::TERMINATED; +} + template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { + this->state = ReactorState::DONE; this->terminate(status); - this->done = true; this->doneCallback(); } diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { @@ -7,60 +9,31 @@ namespace reactor { template -class ClientReadReactorBase : public grpc::ClientReadReactor { +class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { Response response; - bool done = false; - bool terminated = false; - bool initialized = false; - - void terminate(const grpc::Status &status); - protected: grpc::Status status = grpc::Status::OK; - public: Request request; grpc::ClientContext context; void start(); void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - bool isDone(); - bool isTerminated(); virtual std::unique_ptr readResponse(Response &response) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template -void ClientReadReactorBase::terminate( - const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->terminated) { +void ClientReadReactorBase::start() { + if (this->state != ReactorState::NONE) { return; } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - this->terminated = true; -} - -template -void ClientReadReactorBase::start() { this->StartRead(&this->response); - if (!this->initialized) { + if (this->state != ReactorState::RUNNING) { this->StartCall(); - this->initialized = true; + this->state = ReactorState::RUNNING; } } @@ -86,21 +59,32 @@ } template -void ClientReadReactorBase::OnDone( +void ClientReadReactorBase::terminate( const grpc::Status &status) { - this->terminated = true; - this->terminate(status); - this->doneCallback(); -} - -template -bool ClientReadReactorBase::isDone() { - return this->done; + if (this->status.ok()) { + this->status = status; + } + if (!this->status.ok()) { + std::cout << "error: " << this->status.error_message() << std::endl; + } + if (this->state != ReactorState::RUNNING) { + return; + } + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + this->state = ReactorState::TERMINATED; } template -bool ClientReadReactorBase::isTerminated() { - return this->terminated; +void ClientReadReactorBase::OnDone( + const grpc::Status &status) { + this->state = ReactorState::DONE; + this->terminate(status); + this->doneCallback(); } } // namespace reactor diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h --- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,5 +1,7 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { @@ -7,29 +9,22 @@ namespace reactor { template -class ClientWriteReactorBase : public grpc::ClientWriteReactor { +class ClientWriteReactorBase : public grpc::ClientWriteReactor, + public BaseReactor { grpc::Status status = grpc::Status::OK; - bool done = false; - bool terminated = false; - bool initialized = 0; Request request; void nextWrite(); - public: Response response; grpc::ClientContext context; + void start(); void OnWriteDone(bool ok) override; - void terminate(const grpc::Status &status); - bool isDone(); - bool isTerminated(); + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; virtual std::unique_ptr prepareRequest(Request &request) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template @@ -53,6 +48,10 @@ template void ClientWriteReactorBase::start() { + if (this->start != ReactorState::NONE) { + return; + } + this->state = ReactorState::RUNNING; this->nextWrite(); } @@ -74,7 +73,7 @@ if (!this->status.ok()) { std::cout << "error: " << this->status.error_message() << std::endl; } - if (this->terminated) { + if (this->state != ReactorState::RUNNING) { return; } this->terminateCallback(); @@ -83,25 +82,15 @@ } catch (std::runtime_error &e) { this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } - this->terminated = true; + this->state = ReactorState::TERMINATED; this->StartWritesDone(); } -template -bool ClientWriteReactorBase::isDone() { - return this->done; -} - -template -bool ClientWriteReactorBase::isTerminated() { - return this->terminated; -} - template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { + this->state = ReactorState::DONE; this->terminate(status); - this->done = true; this->doneCallback(); } diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h --- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.h @@ -3,6 +3,8 @@ #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" +#include "BaseReactor.h" + #include #include @@ -13,8 +15,8 @@ namespace network { namespace reactor { -class BlobAppendHolderClientReactor : public grpc::ClientUnaryReactor { - bool done = false; +class BlobAppendHolderClientReactor : public grpc::ClientUnaryReactor, + public BaseReactor { grpc::Status status = grpc::Status::OK; std::condition_variable *terminationNotifier; @@ -28,7 +30,6 @@ const std::string &hash, std::condition_variable *terminationNotifier); void OnDone(const grpc::Status &status); - bool isDone() const; grpc::Status getStatus() const; }; diff --git a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp --- a/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobAppendHolderClientReactor.cpp @@ -15,14 +15,10 @@ void BlobAppendHolderClientReactor::OnDone(const grpc::Status &status) { this->status = status; - this->done = true; + this->state = ReactorState::DONE; this->terminationNotifier->notify_one(); } -bool BlobAppendHolderClientReactor::isDone() const { - return this->done; -} - grpc::Status BlobAppendHolderClientReactor::getStatus() const { return this->status; } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -85,7 +85,7 @@ } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->isDone()) { + if (this->putReactor->getState() == ReactorState::DONE) { if (!this->putReactor->getStatus().ok()) { throw std::runtime_error(this->putReactor->getStatus().error_message()); } @@ -95,7 +95,7 @@ if (this->putReactor->getDataExists()) { this->initializeHolderReactor(); std::unique_lock lockHolder(this->blobAppendHolderDoneCVMutex); - if (this->holderReactor->isDone()) { + if (this->holderReactor->getState() == ReactorState::DONE) { if (!this->holderReactor->getStatus().ok()) { throw std::runtime_error( this->holderReactor->getStatus().error_message()); diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -150,7 +150,7 @@ } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (this->putReactor->isDone()) { + if (this->putReactor->getState() == ReactorState::DONE) { if (!this->putReactor->getStatus().ok()) { throw std::runtime_error(this->putReactor->getStatus().error_message()); } @@ -160,7 +160,7 @@ if (this->putReactor->getDataExists()) { this->initializeHolderReactor(); std::unique_lock lockHolder(this->blobAppendHolderDoneCVMutex); - if (this->holderReactor->isDone()) { + if (this->holderReactor->getState() == ReactorState::DONE) { if (!this->holderReactor->getStatus().ok()) { throw std::runtime_error( this->holderReactor->getStatus().error_message());