diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index 11b502347..fafeed911 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,101 +1,101 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); response->set_backupid(this->backupID); this->holder = this->backupID; this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); - if (this->putReactor->getUtility()->state == ReactorState::DONE && - !this->putReactor->getUtility()->getStatus().ok()) { + if (this->putReactor->getStatusHolder()->state == ReactorState::DONE && + !this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->putReactor->getUtility()->getStatus().error_message()); + this->putReactor->getStatusHolder()->getStatus().error_message()); } - if (this->putReactor->getUtility()->state != ReactorState::DONE) { + if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lock2); - } else if (!this->putReactor->getUtility()->getStatus().ok()) { + } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->putReactor->getUtility()->getStatus().error_message()); + this->putReactor->getStatusHolder()->getStatus().error_message()); } try { // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, getCurrentTimestamp(), generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } catch (std::runtime_error &e) { std::cout << "db operations error: " << e.what() << std::endl; } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp index d9c19c96a..4d3f766c9 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,148 +1,148 @@ #include "PullBackupReactor.h" #include "DatabaseManager.h" #include namespace comm { namespace network { namespace reactor { PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } this->getReactor.reset( new reactor::BlobGetClientReactor(holder, this->dataChunks)); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { if (this->getReactor == nullptr) { this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!dataChunk.empty()) { response->set_compactionchunk(dataChunk); return nullptr; } if (!this->dataChunks->isEmpty()) { throw std::runtime_error( "dangling data discovered after reading compaction"); } - if (!this->getReactor->getUtility()->getStatus().ok()) { + if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->getReactor->getUtility()->getStatus().error_message()); + this->getReactor->getStatusHolder()->getStatus().error_message()); } this->state = State::LOGS; } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with the // compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { // we reached the end of the logs collection so we just want to terminate // either we terminate with an error if we have some dangling data // or with success if we don't if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { // we went out of the scope of the logs collection, this should never // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor and // proceed this->initializeGetReactor(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logchunk(this->currentLog->getValue()); ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } } // we want to read the chunks from the blob through the get client until we // get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getUtility()->getStatus().ok()) { + if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->getReactor->getUtility()->getStatus().error_message()); + this->getReactor->getStatusHolder()->getStatus().error_message()); } // if we get an empty chunk, we reset the currentLog so we can read the next // one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getUtility()->getStatus().ok()) { + if (!this->getReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->getReactor->getUtility()->getStatus().error_message()); + this->getReactor->getStatusHolder()->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 20e3f5109..11459c77b 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,158 +1,159 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->generateLogID(), (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateHolder() { // TODO replace mock return generateRandomString(); } std::string SendLogReactor::generateLogID() { // TODO replace mock return generateRandomString(); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = this->generateHolder(); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (this->putReactor->getUtility()->state != ReactorState::DONE) { + if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getUtility()->getStatus().ok()) { + } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( - this->putReactor->getUtility()->getStatus().error_message()); + this->putReactor->getStatusHolder()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " - << this->getUtility()->getStatus().error_code() << "/" - << this->getUtility()->getStatus().error_message() << std::endl; + << this->getStatusHolder()->getStatus().error_code() << "/" + << this->getStatusHolder()->getStatus().error_message() + << std::endl; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/ReactorStatusHolder.h b/services/blob/src/Reactors/ReactorStatusHolder.h deleted file mode 100644 index a0d95a4d8..000000000 --- a/services/blob/src/Reactors/ReactorStatusHolder.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -enum class ReactorState { - NONE = 0, - RUNNING = 1, - TERMINATED = 2, - DONE = 3, -}; - -class ReactorStatusHolder { -private: - grpc::Status status = grpc::Status::OK; - std::mutex statusAccessMutex; - -public: - std::atomic state = ReactorState::NONE; - - grpc::Status getStatus() { - const std::unique_lock lock(this->statusAccessMutex); - return this->status; - } - void setStatus(const grpc::Status &status) { - const std::unique_lock lock(this->statusAccessMutex); - this->status = status; - } -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/BaseReactor.h b/services/lib/src/BaseReactor.h index ed89c7f50..5cb6c5944 100644 --- a/services/lib/src/BaseReactor.h +++ b/services/lib/src/BaseReactor.h @@ -1,24 +1,24 @@ #pragma once #include "ReactorStatusHolder.h" #include #include namespace comm { namespace network { namespace reactor { class BaseReactor { public: - virtual std::shared_ptr getUtility() = 0; + virtual std::shared_ptr getStatusHolder() = 0; virtual void terminate(const grpc::Status &status) = 0; virtual void validate() = 0; virtual void doneCallback() = 0; virtual void terminateCallback() = 0; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/ReactorStatusHolder.h b/services/lib/src/ReactorStatusHolder.h similarity index 100% rename from services/backup/src/Reactors/ReactorStatusHolder.h rename to services/lib/src/ReactorStatusHolder.h diff --git a/services/lib/src/ReactorUtility.h b/services/lib/src/ReactorUtility.h deleted file mode 100644 index a0d95a4d8..000000000 --- a/services/lib/src/ReactorUtility.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace comm { -namespace network { -namespace reactor { - -enum class ReactorState { - NONE = 0, - RUNNING = 1, - TERMINATED = 2, - DONE = 3, -}; - -class ReactorStatusHolder { -private: - grpc::Status status = grpc::Status::OK; - std::mutex statusAccessMutex; - -public: - std::atomic state = ReactorState::NONE; - - grpc::Status getStatus() { - const std::unique_lock lock(this->statusAccessMutex); - return this->status; - } - void setStatus(const grpc::Status &status) { - const std::unique_lock lock(this->statusAccessMutex); - this->status = status; - } -}; - -} // namespace reactor -} // namespace network -} // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h index b7fcaa215..264653834 100644 --- a/services/lib/src/client-base-reactors/ClientBidiReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientBidiReactorBase.h @@ -1,128 +1,128 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { template class ClientBidiReactorBase : public grpc::ClientBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; std::shared_ptr response = nullptr; void nextWrite(); protected: Request request; public: grpc::ClientContext context; void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); } template void ClientBidiReactorBase::start() { - if (this->utility->state != ReactorState::NONE) { + if (this->statusHolder->state != ReactorState::NONE) { return; } - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); this->StartCall(); } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } template void ClientBidiReactorBase::terminate( const grpc::Status &status) { - if (this->utility->getStatus().ok()) { - this->utility->setStatus(status); + if (this->statusHolder->getStatus().ok()) { + this->statusHolder->setStatus(status); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWritesDone(); - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; } template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr -ClientBidiReactorBase::getUtility() { - return this->utility; +ClientBidiReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientReadReactorBase.h b/services/lib/src/client-base-reactors/ClientReadReactorBase.h index 01c4f8bfe..a7529611e 100644 --- a/services/lib/src/client-base-reactors/ClientReadReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientReadReactorBase.h @@ -1,107 +1,107 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { template class ClientReadReactorBase : public grpc::ClientReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Response response; public: Request request; grpc::ClientContext context; void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr readResponse(Response &response) = 0; }; template void ClientReadReactorBase::start() { - if (this->utility->state != ReactorState::NONE) { + if (this->statusHolder->state != ReactorState::NONE) { return; } this->StartRead(&this->response); - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { this->StartCall(); - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartRead(&this->response); } template void ClientReadReactorBase::terminate( const grpc::Status &status) { - if (this->utility->getStatus().ok()) { - this->utility->setStatus(status); + if (this->statusHolder->getStatus().ok()) { + this->statusHolder->setStatus(status); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; } template void ClientReadReactorBase::OnDone( const grpc::Status &status) { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr -ClientReadReactorBase::getUtility() { - return this->utility; +ClientReadReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h index 8b2215d6e..03743c4fe 100644 --- a/services/lib/src/client-base-reactors/ClientWriteReactorBase.h +++ b/services/lib/src/client-base-reactors/ClientWriteReactorBase.h @@ -1,114 +1,114 @@ #pragma once #include "BaseReactor.h" #include namespace comm { namespace network { namespace reactor { template class ClientWriteReactorBase : public grpc::ClientWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Request request; void nextWrite(); public: Response response; grpc::ClientContext context; void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr prepareRequest(Request &request) = 0; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::start() { if (this->start != ReactorState::NONE) { return; } - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { - if (this->utility->getStatus().ok()) { - this->utility->setStatus(status); + if (this->statusHolder->getStatus().ok()) { + this->statusHolder->setStatus(status); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; this->StartWritesDone(); } template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->terminate(status); this->doneCallback(); } template std::shared_ptr -ClientWriteReactorBase::getUtility() { - return this->utility; +ClientWriteReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h index 1b9ae96a3..d1ed86ab7 100644 --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h @@ -1,157 +1,157 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; template class ServerBidiReactorBase : public grpc::ServerBidiReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); void terminate(const grpc::Status &status) override; void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; void terminate(ServerBidiReactorStatus status); ServerBidiReactorStatus getStatus() const; void setStatus(const ServerBidiReactorStatus &status); virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; }; template ServerBidiReactorBase::ServerBidiReactorBase() { - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerBidiReactorBase::terminate( const grpc::Status &status) { this->terminate(ServerBidiReactorStatus(status)); } template void ServerBidiReactorBase::OnDone() { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->setStatus(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->getStatus().status); } else { this->Finish(this->getStatus().status); } - this->utility->state = ReactorState::TERMINATED; + this->statusHolder->state = ReactorState::TERMINATED; } template ServerBidiReactorStatus ServerBidiReactorBase::getStatus() const { return this->status; } template void ServerBidiReactorBase::setStatus( const ServerBidiReactorStatus &status) { this->status = status; } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } template std::shared_ptr -ServerBidiReactorBase::getUtility() { - return this->utility; +ServerBidiReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h index 7ece8fb97..da3abbdba 100644 --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -1,109 +1,109 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Request request; protected: Response *response; public: ServerReadReactorBase(Response *response); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr readRequest(Request request) = 0; }; template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { - this->utility->setStatus(status); + this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } - this->Finish(this->utility->getStatus()); - this->utility->state = ReactorState::TERMINATED; + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; } template void ServerReadReactorBase::OnDone() { - this->utility->state = ReactorState::DONE; + this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr -ServerReadReactorBase::getUtility() { - return this->utility; +ServerReadReactorBase::getStatusHolder() { + return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h index eb7a8c510..a9db947ce 100644 --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -1,131 +1,131 @@ #pragma once #include "BaseReactor.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { - std::shared_ptr utility; + std::shared_ptr statusHolder; Response response; bool initialized = false; void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; public: ServerWriteReactorBase(const Request *request); void start(); void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; virtual void initialize(){}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); void OnDone() override; - std::shared_ptr getUtility() override; + std::shared_ptr getStatusHolder() override; virtual std::unique_ptr writeResponse(Response *response) = 0; }; template void ServerWriteReactorBase::terminate( const grpc::Status &status) { - this->utility->setStatus(status); + this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { - this->utility->setStatus( + this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - if (!this->utility->getStatus().ok()) { - std::cout << "error: " << this->utility->getStatus().error_message() + if (!this->statusHolder->getStatus().ok()) { + std::cout << "error: " << this->statusHolder->getStatus().error_message() << std::endl; } - if (this->utility->state != ReactorState::RUNNING) { + if (this->statusHolder->state != ReactorState::RUNNING) { return; } - this->Finish(this->utility->getStatus()); - this->utility->state = ReactorState::TERMINATED; + this->Finish(this->statusHolder->getStatus()); + this->statusHolder->state = ReactorState::TERMINATED; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::start() { - this->utility->state = ReactorState::RUNNING; + this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr -ServerWriteReactorBase::getUtility() { - return this->utility; +ServerWriteReactorBase::getStatusHolder() { + return this->statusHolder; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm