diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 8e29d07ec..20e3f5109 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,157 +1,158 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->generateLogID(), (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateHolder() { // TODO replace mock return generateRandomString(); } std::string SendLogReactor::generateLogID() { // TODO replace mock return generateRandomString(); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = this->generateHolder(); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getUtility()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); } else if (!this->putReactor->getUtility()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getUtility()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement - std::cout << "receive logs done " << this->status.error_code() << "/" - << this->status.error_message() << std::endl; + std::cout << "receive logs done " + << this->getUtility()->getStatus().error_code() << "/" + << this->getUtility()->getStatus().error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h index 066d76110..89c647d87 100644 --- a/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerBidiReactorBase.h @@ -1,126 +1,157 @@ #pragma once +#include "BaseReactor.h" + #include #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template -class ServerBidiReactorBase - : public grpc::ServerBidiReactor { +class ServerBidiReactorBase : public grpc::ServerBidiReactor, + public BaseReactor { + std::shared_ptr utility; Request request; Response response; - std::atomic finished = false; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); + void terminate(const grpc::Status &status) override; + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; + std::shared_ptr getUtility() override; void terminate(ServerBidiReactorStatus status); + ServerBidiReactorStatus getStatus() const; + void setStatus(const ServerBidiReactorStatus &status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; - virtual void initialize(){}; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template ServerBidiReactorBase::ServerBidiReactorBase() { - this->initialize(); + this->utility->state = ReactorState::RUNNING; this->StartRead(&this->request); } +template +void ServerBidiReactorBase::terminate( + const grpc::Status &status) { + this->terminate(ServerBidiReactorStatus(status)); +} + template void ServerBidiReactorBase::OnDone() { + this->utility->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { - this->status = status; + this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = ServerBidiReactorStatus( - grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + this->setStatus(ServerBidiReactorStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } - if (this->finished) { + if (this->utility->state != ReactorState::RUNNING) { return; } - if (this->status.sendLastResponse) { + if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( - &this->response, grpc::WriteOptions(), this->status.status); + &this->response, grpc::WriteOptions(), this->getStatus().status); } else { - this->Finish(this->status.status); + this->Finish(this->getStatus().status); } - this->finished = true; + this->utility->state = ReactorState::TERMINATED; +} + +template +ServerBidiReactorStatus +ServerBidiReactorBase::getStatus() const { + return this->status; +} + +template +void ServerBidiReactorBase::setStatus( + const ServerBidiReactorStatus &status) { + this->status = status; } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } +template +std::shared_ptr +ServerBidiReactorBase::getUtility() { + return this->utility; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h index 49efb664a..d427f862d 100644 --- a/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerReadReactorBase.h @@ -1,97 +1,109 @@ #pragma once +#include "BaseReactor.h" + #include #include #include #include #include namespace comm { namespace network { namespace reactor { template -class ServerReadReactorBase : public grpc::ServerReadReactor { +class ServerReadReactorBase : public grpc::ServerReadReactor, + public BaseReactor { + std::shared_ptr utility; Request request; - std::atomic finished = false; - - void terminate(grpc::Status status); protected: Response *response; - grpc::Status status; public: ServerReadReactorBase(Response *response); - void OnDone() override; + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; + void OnDone() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr readRequest(Request request) = 0; - virtual void initialize(){}; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; -template -void ServerReadReactorBase::terminate(grpc::Status status) { - this->status = status; - try { - this->terminateCallback(); - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->finished) { - return; - } - this->Finish(this->status); - this->finished = true; -} - template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { - this->initialize(); + this->utility->state = ReactorState::RUNNING; this->StartRead(&this->request); } -template -void ServerReadReactorBase::OnDone() { - this->doneCallback(); - // This looks weird but apparently it is okay to do this. More information: - // https://phabricator.ashoat.com/D3246#87890 - delete this; -} - template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } +template +void ServerReadReactorBase::terminate( + const grpc::Status &status) { + this->utility->setStatus(status); + try { + this->terminateCallback(); + this->validate(); + } catch (std::runtime_error &e) { + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; + } + if (this->utility->state != ReactorState::RUNNING) { + return; + } + this->Finish(this->utility->getStatus()); + this->utility->state = ReactorState::TERMINATED; +} + +template +void ServerReadReactorBase::OnDone() { + this->utility->state = ReactorState::DONE; + this->doneCallback(); + // This looks weird but apparently it is okay to do this. More information: + // https://phabricator.ashoat.com/D3246#87890 + delete this; +} + +template +std::shared_ptr +ServerReadReactorBase::getUtility() { + return this->utility; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h index 2f17bc25d..a7b5e737c 100644 --- a/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h +++ b/services/backup/src/Reactors/server/base-reactors/ServerWriteReactorBase.h @@ -1,116 +1,131 @@ #pragma once +#include "BaseReactor.h" + #include #include #include #include #include namespace comm { namespace network { namespace reactor { template -class ServerWriteReactorBase : public grpc::ServerWriteReactor { +class ServerWriteReactorBase : public grpc::ServerWriteReactor, + public BaseReactor { + std::shared_ptr utility; Response response; bool initialized = false; - std::atomic finished = false; - void terminate(grpc::Status status); void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; - grpc::Status status; public: ServerWriteReactorBase(const Request *request); void start(); - void OnDone() override; + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + + virtual void initialize(){}; void OnWriteDone(bool ok) override; + void terminate(const grpc::Status &status); + void OnDone() override; + std::shared_ptr getUtility() override; virtual std::unique_ptr writeResponse(Response *response) = 0; - virtual void initialize(){}; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template -void ServerWriteReactorBase::terminate(grpc::Status status) { - this->status = status; +void ServerWriteReactorBase::terminate( + const grpc::Status &status) { + this->utility->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; } - if (this->finished) { + if (this->utility->state != ReactorState::RUNNING) { return; } - this->Finish(this->status); - this->finished = true; + this->Finish(this->utility->getStatus()); + this->utility->state = ReactorState::TERMINATED; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::start() { + this->utility->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } +template +std::shared_ptr +ServerWriteReactorBase::getUtility() { + return this->utility; +} + template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm