diff --git a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h index 1f38299eb..fce0b3d4d 100644 --- a/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientBidiReactorBase.h @@ -1,130 +1,128 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { namespace network { namespace reactor { template -class ClientBidiReactorBase - : public grpc::ClientBidiReactor { +class ClientBidiReactorBase : public grpc::ClientBidiReactor, + public BaseReactor { + std::shared_ptr utility; std::shared_ptr response = nullptr; - bool terminated = false; - bool done = false; - bool initialized = 0; - void nextWrite(); protected: Request request; - grpc::Status status = grpc::Status::OK; public: grpc::ClientContext context; void start(); - void terminate(const grpc::Status &status); - bool isTerminated(); - bool isDone(); + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnWriteDone(bool ok) override; void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; + std::shared_ptr getUtility() override; virtual std::unique_ptr prepareRequest( Request &request, std::shared_ptr previousResponse) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template void ClientBidiReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request, this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartWrite(&this->request); - if (!this->initialized) { - this->StartCall(); - this->initialized = true; - } } template void ClientBidiReactorBase::start() { - this->nextWrite(); -} - -template -void ClientBidiReactorBase::terminate( - const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->terminated) { + if (this->utility->state != ReactorState::NONE) { return; } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - this->StartWritesDone(); - this->terminated = true; -} - -template -bool ClientBidiReactorBase::isTerminated() { - return this->terminated; -} - -template -bool ClientBidiReactorBase::isDone() { - return this->done; + this->utility->state = ReactorState::RUNNING; + this->nextWrite(); + this->StartCall(); } template void ClientBidiReactorBase::OnWriteDone(bool ok) { if (this->response == nullptr) { this->response = std::make_shared(); } this->StartRead(&(*this->response)); } template void ClientBidiReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } this->nextWrite(); } +template +void ClientBidiReactorBase::terminate( + const grpc::Status &status) { + if (this->utility->getStatus().ok()) { + this->utility->setStatus(status); + } + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; + } + if (this->utility->state != ReactorState::RUNNING) { + return; + } + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } + this->StartWritesDone(); + this->utility->state = ReactorState::TERMINATED; +} + template void ClientBidiReactorBase::OnDone( const grpc::Status &status) { + this->utility->state = ReactorState::DONE; this->terminate(status); - this->done = true; this->doneCallback(); } +template +std::shared_ptr +ClientBidiReactorBase::getUtility() { + return this->utility; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h index cc37c01bf..9e2a2a743 100644 --- a/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientReadReactorBase.h @@ -1,108 +1,107 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { namespace network { namespace reactor { template -class ClientReadReactorBase : public grpc::ClientReadReactor { +class ClientReadReactorBase : public grpc::ClientReadReactor, + public BaseReactor { + std::shared_ptr utility; Response response; - bool done = false; - bool terminated = false; - bool initialized = false; - - void terminate(const grpc::Status &status); - -protected: - grpc::Status status = grpc::Status::OK; public: Request request; grpc::ClientContext context; void start(); + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnReadDone(bool ok) override; + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; - bool isDone(); - bool isTerminated(); + std::shared_ptr getUtility() override; virtual std::unique_ptr readResponse(Response &response) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template -void ClientReadReactorBase::terminate( - const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; - } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; - } - if (this->terminated) { +void ClientReadReactorBase::start() { + if (this->utility->state != ReactorState::NONE) { return; } - this->terminateCallback(); - try { - this->validate(); - } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } - this->terminated = true; -} - -template -void ClientReadReactorBase::start() { this->StartRead(&this->response); - if (!this->initialized) { + if (this->utility->state != ReactorState::RUNNING) { this->StartCall(); - this->initialized = true; + this->utility->state = ReactorState::RUNNING; } } template void ClientReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readResponse(this->response); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartRead(&this->response); } template -void ClientReadReactorBase::OnDone( +void ClientReadReactorBase::terminate( const grpc::Status &status) { - this->terminated = true; - this->terminate(status); - this->doneCallback(); + if (this->utility->getStatus().ok()) { + this->utility->setStatus(status); + } + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; + } + if (this->utility->state != ReactorState::RUNNING) { + return; + } + this->terminateCallback(); + try { + this->validate(); + } catch (std::runtime_error &e) { + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); + } + this->utility->state = ReactorState::TERMINATED; } template -bool ClientReadReactorBase::isDone() { - return this->done; +void ClientReadReactorBase::OnDone( + const grpc::Status &status) { + this->utility->state = ReactorState::DONE; + this->terminate(status); + this->doneCallback(); } template -bool ClientReadReactorBase::isTerminated() { - return this->terminated; +std::shared_ptr +ClientReadReactorBase::getUtility() { + return this->utility; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h index 8b6ff588d..774eff055 100644 --- a/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h +++ b/services/backup/src/Reactors/client/base-reactors/ClientWriteReactorBase.h @@ -1,110 +1,114 @@ #pragma once +#include "BaseReactor.h" + #include namespace comm { namespace network { namespace reactor { template -class ClientWriteReactorBase : public grpc::ClientWriteReactor { - grpc::Status status = grpc::Status::OK; - bool done = false; - bool terminated = false; - bool initialized = 0; +class ClientWriteReactorBase : public grpc::ClientWriteReactor, + public BaseReactor { + std::shared_ptr utility; Request request; void nextWrite(); public: Response response; grpc::ClientContext context; + void start(); + + void validate() override{}; + void doneCallback() override{}; + void terminateCallback() override{}; + void OnWriteDone(bool ok) override; - void terminate(const grpc::Status &status); - bool isDone(); - bool isTerminated(); + void terminate(const grpc::Status &status) override; void OnDone(const grpc::Status &status) override; + std::shared_ptr getUtility() override; virtual std::unique_ptr prepareRequest(Request &request) = 0; - virtual void validate(){}; - virtual void doneCallback(){}; - virtual void terminateCallback(){}; }; template void ClientWriteReactorBase::nextWrite() { this->request = Request(); try { std::unique_ptr status = this->prepareRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } this->StartWrite(&this->request); if (!this->initialized) { this->StartCall(); this->initialized = true; } } template void ClientWriteReactorBase::start() { + if (this->start != ReactorState::NONE) { + return; + } + this->utility->state = ReactorState::RUNNING; this->nextWrite(); } template void ClientWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::UNKNOWN, "write error")); return; } this->nextWrite(); } template void ClientWriteReactorBase::terminate( const grpc::Status &status) { - if (this->status.ok()) { - this->status = status; + if (this->utility->getStatus().ok()) { + this->utility->setStatus(status); } - if (!this->status.ok()) { - std::cout << "error: " << this->status.error_message() << std::endl; + if (!this->utility->getStatus().ok()) { + std::cout << "error: " << this->utility->getStatus().error_message() + << std::endl; } - if (this->terminated) { + if (this->utility->state != ReactorState::RUNNING) { return; } this->terminateCallback(); try { this->validate(); } catch (std::runtime_error &e) { - this->status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + this->utility->setStatus( + grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } - this->terminated = true; + this->utility->state = ReactorState::TERMINATED; this->StartWritesDone(); } -template -bool ClientWriteReactorBase::isDone() { - return this->done; -} - -template -bool ClientWriteReactorBase::isTerminated() { - return this->terminated; -} - template void ClientWriteReactorBase::OnDone( const grpc::Status &status) { + this->utility->state = ReactorState::DONE; this->terminate(status); - this->done = true; this->doneCallback(); } +template +std::shared_ptr +ClientWriteReactorBase::getUtility() { + return this->utility; +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp index be85c41da..c7497d475 100644 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.cpp @@ -1,32 +1,28 @@ #include "BlobGetClientReactor.h" namespace comm { namespace network { namespace reactor { BlobGetClientReactor::BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks) : holder(holder), dataChunks(dataChunks) { } std::unique_ptr BlobGetClientReactor::readResponse(blob::GetResponse &response) { if (!this->dataChunks->write(std::move(*response.mutable_datachunk()))) { throw std::runtime_error("error reading data from the blob service"); } return nullptr; } void BlobGetClientReactor::doneCallback() { this->dataChunks->write(""); } -grpc::Status BlobGetClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h index 162a38f64..a5d9e4e2a 100644 --- a/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobGetClientReactor.h @@ -1,36 +1,35 @@ #pragma once #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include "ClientReadReactorBase.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobGetClientReactor : public ClientReadReactorBase { std::string holder; std::shared_ptr> dataChunks; public: BlobGetClientReactor( const std::string &holder, std::shared_ptr> dataChunks); std::unique_ptr readResponse(blob::GetResponse &response) override; void doneCallback() override; - grpc::Status getStatus() const; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp index 372e7d4ba..c33b4321a 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.cpp @@ -1,62 +1,58 @@ #include "BlobPutClientReactor.h" #include namespace comm { namespace network { namespace reactor { BlobPutClientReactor::BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier) : holder(holder), hash(hash), dataChunks(folly::MPMCQueue(100)), terminationNotifier(terminationNotifier) { } void BlobPutClientReactor::scheduleSendingDataChunk( std::unique_ptr dataChunk) { if (!this->dataChunks.write(std::move(*dataChunk))) { throw std::runtime_error( "Error scheduling sending a data chunk to send to the blob service"); } } std::unique_ptr BlobPutClientReactor::prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) { if (this->state == State::SEND_HOLDER) { this->request.set_holder(this->holder); this->state = State::SEND_HASH; return nullptr; } if (this->state == State::SEND_HASH) { request.set_blobhash(this->hash); this->state = State::SEND_CHUNKS; return nullptr; } if (previousResponse->dataexists()) { return std::make_unique(grpc::Status::OK); } std::string dataChunk; this->dataChunks.blockingRead(dataChunk); if (dataChunk.empty()) { return std::make_unique(grpc::Status::OK); } request.set_datachunk(dataChunk); return nullptr; } void BlobPutClientReactor::doneCallback() { this->terminationNotifier->notify_one(); } -grpc::Status BlobPutClientReactor::getStatus() const { - return this->status; -} - } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h index df6a78de8..f487d2316 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,54 +1,53 @@ #pragma once #include "Constants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include "ClientBidiReactorBase.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobPutClientReactor : public ClientBidiReactorBase { enum class State { SEND_HOLDER = 0, SEND_HASH = 1, SEND_CHUNKS = 2, }; State state = State::SEND_HOLDER; const std::string hash; const std::string holder; size_t currentDataSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; std::condition_variable *terminationNotifier; public: BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::unique_ptr dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; void doneCallback() override; - grpc::Status getStatus() const; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index a62c04c12..3977f3757 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,94 +1,100 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); response->set_backupid(this->backupID); this->holder = this->backupID; this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { + if (this->putReactor->getUtility()->state == ReactorState::DONE && + !this->putReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->putReactor->getUtility()->getStatus().error_message()); + } + if (this->putReactor->getUtility()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lock2); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); + } else if (!this->putReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->putReactor->getUtility()->getStatus().error_message()); } try { // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, getCurrentTimestamp(), generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } catch (std::runtime_error &e) { std::cout << "db operations error: " << e.what() << std::endl; } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/PullBackupReactor.cpp b/services/backup/src/Reactors/server/PullBackupReactor.cpp index 09b3d3a5e..d9c19c96a 100644 --- a/services/backup/src/Reactors/server/PullBackupReactor.cpp +++ b/services/backup/src/Reactors/server/PullBackupReactor.cpp @@ -1,145 +1,148 @@ #include "PullBackupReactor.h" #include "DatabaseManager.h" #include namespace comm { namespace network { namespace reactor { PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } this->getReactor.reset( new reactor::BlobGetClientReactor(holder, this->dataChunks)); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { if (this->getReactor == nullptr) { this->initializeGetReactor(this->backupItem->getCompactionHolder()); } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!dataChunk.empty()) { response->set_compactionchunk(dataChunk); return nullptr; } if (!this->dataChunks->isEmpty()) { throw std::runtime_error( "dangling data discovered after reading compaction"); } - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (!this->getReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->getReactor->getUtility()->getStatus().error_message()); } this->state = State::LOGS; } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { // this means that there are no logs at all so we just terminate with the // compaction return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { // we reached the end of the logs collection so we just want to terminate // either we terminate with an error if we have some dangling data // or with success if we don't if (!this->dataChunks->isEmpty()) { throw std::runtime_error("dangling data discovered after reading logs"); } return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { // we went out of the scope of the logs collection, this should never // happen and should be perceived as an error throw std::runtime_error("log index out of bound"); } // this means that we're not reading anything between invocations of // writeResponse // it is only not null when we read data in chunks if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); if (this->currentLog->getPersistedInBlob()) { // if the item is stored in the blob, we initialize the get reactor and // proceed this->initializeGetReactor(this->currentLog->getValue()); } else { // if the item is persisted in the database, we just take it, send the // data to the client and reset currentLog so the next invocation of // writeResponse will take another one from the collection response->set_logchunk(this->currentLog->getValue()); ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } } // we want to read the chunks from the blob through the get client until we // get an empty chunk - a sign of "end of chunks" std::string dataChunk; this->dataChunks->blockingRead(dataChunk); - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (!this->getReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->getReactor->getUtility()->getStatus().error_message()); } // if we get an empty chunk, we reset the currentLog so we can read the next // one from the logs collection. // If there's data inside, we write it to the client and proceed. if (dataChunk.empty()) { ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } void PullBackupReactor::terminateCallback() { - if (!this->getReactor->getStatus().ok()) { - throw std::runtime_error(this->getReactor->getStatus().error_message()); + if (!this->getReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->getReactor->getUtility()->getStatus().error_message()); } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 7882222b2..8e29d07ec 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,156 +1,157 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->generateLogID(), (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateHolder() { // TODO replace mock return generateRandomString(); } std::string SendLogReactor::generateLogID() { // TODO replace mock return generateRandomString(); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = this->generateHolder(); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); - if (!this->putReactor->isDone()) { + if (this->putReactor->getUtility()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getStatus().ok()) { - throw std::runtime_error(this->putReactor->getStatus().error_message()); + } else if (!this->putReactor->getUtility()->getStatus().ok()) { + throw std::runtime_error( + this->putReactor->getUtility()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm