diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h index e0b5a1b14..68bc7fdaf 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -1,131 +1,135 @@ #pragma once #include "DatabaseManager.h" #include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" #include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { USER_ID = 1, KEY_ENTROPY = 2, DATA_HASH = 3, DATA_CHUNKS = 4, }; State state = State::USER_ID; std::string userID; std::string keyEntropy; std::string dataHash; std::string holder; std::string backupID; std::shared_ptr putReactor; ServiceBlobClient blobClient; std::mutex reactorStateMutex; std::condition_variable blobDoneCV; std::mutex blobDoneCVMutex; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; - void terminateCallback(); + void terminateCallback() override; }; std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); response->set_backupid(this->backupID); this->holder = this->backupID; this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobDoneCVMutex); - this->blobDoneCV.wait(lock2); + if (!this->putReactor->isDone()) { + this->blobDoneCV.wait(lock2); + } else if (!this->putReactor->getStatus().ok()) { + throw std::runtime_error(this->putReactor->getStatus().error_message()); + } try { // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, getCurrentTimestamp(), generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } catch (std::runtime_error &e) { std::cout << "db operations error: " << e.what() << std::endl; } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h index 5430d0245..71a289b45 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/PullBackupReactor.h @@ -1,152 +1,159 @@ #pragma once #include "BlobGetClientReactor.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" #include "ServerWriteReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { class PullBackupReactor : public ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse> { enum class State { COMPACTION = 1, LOGS = 2, }; std::shared_ptr backupItem; std::shared_ptr getReactor; std::mutex reactorStateMutex; std::shared_ptr> dataChunks; ServiceBlobClient blobClient; State state = State::COMPACTION; std::vector> logs; size_t currentLogIndex = 0; std::shared_ptr currentLog; void initializeGetReactor(const std::string &holder); public: PullBackupReactor(const backup::PullBackupRequest *request); void initialize() override; std::unique_ptr writeResponse(backup::PullBackupResponse *response) override; + void terminateCallback() override; }; PullBackupReactor::PullBackupReactor(const backup::PullBackupRequest *request) : ServerWriteReactorBase< backup::PullBackupRequest, backup::PullBackupResponse>(request), dataChunks(std::make_shared>(100)) { } void PullBackupReactor::initializeGetReactor(const std::string &holder) { if (this->backupItem == nullptr) { throw std::runtime_error( "get reactor cannot be initialized when backup item is missing"); } if (this->getReactor == nullptr) { this->getReactor = std::make_shared( holder, this->dataChunks); this->getReactor->request.set_holder(holder); this->blobClient.get(this->getReactor); } } void PullBackupReactor::initialize() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->request.userid().empty()) { throw std::runtime_error("no user id provided"); } if (this->request.backupid().empty()) { throw std::runtime_error("no backup id provided"); } this->backupItem = database::DatabaseManager::getInstance().findBackupItem( this->request.userid(), this->request.backupid()); if (this->backupItem == nullptr) { throw std::runtime_error( "no backup found for provided parameters: user id [" + this->request.userid() + "], backup id [" + this->request.backupid() + "]"); } this->logs = database::DatabaseManager::getInstance().findLogItemsForBackup( this->request.backupid()); } std::unique_ptr PullBackupReactor::writeResponse(backup::PullBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); if (this->state == State::COMPACTION) { this->initializeGetReactor(this->backupItem->getCompactionHolder()); std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (!dataChunk.empty()) { response->set_compactionchunk(dataChunk); return nullptr; } if (!this->dataChunks->isEmpty()) { throw std::runtime_error( "dangling data discovered after reading compaction"); } this->getReactor = nullptr; this->state = State::LOGS; } if (this->state == State::LOGS) { // TODO make sure logs are received in correct order regardless their size if (this->logs.empty()) { return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex == this->logs.size()) { return std::make_unique(grpc::Status::OK); } if (this->currentLogIndex > this->logs.size()) { throw std::runtime_error("log index out of bound"); } if (this->currentLog == nullptr) { this->currentLog = this->logs.at(this->currentLogIndex); if (this->currentLog->getPersistedInBlob()) { this->initializeGetReactor(this->currentLog->getValue()); } else { response->set_logchunk(this->currentLog->getValue()); ++this->currentLogIndex; this->currentLog = nullptr; return nullptr; } } std::string dataChunk; this->dataChunks->blockingRead(dataChunk); if (dataChunk.empty()) { ++this->currentLogIndex; this->currentLog = nullptr; } else { response->set_logchunk(dataChunk); } return nullptr; } throw std::runtime_error("unhandled state"); } +void PullBackupReactor::terminateCallback() { + if (!this->getReactor->getStatus().ok()) { + throw std::runtime_error(this->getReactor->getStatus().error_message()); + } +} + } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h index ef1f2f767..59f6a7edb 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h @@ -1,208 +1,212 @@ #pragma once #include "Constants.h" #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" #include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, google::protobuf::Empty> { enum class State { USER_ID = 1, BACKUP_ID = 2, LOG_HASH = 3, LOG_CHUNK = 4, }; enum class PersistenceMethod { UNKNOWN = 0, DB = 1, BLOB = 2, }; State state = State::USER_ID; PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; std::string backupID; std::string hash; // either the value itself which is a dump of a single operation (if // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is // true) std::string value; std::mutex reactorStateMutex; std::condition_variable blobDoneCV; std::mutex blobDoneCVMutex; std::shared_ptr putReactor; ServiceBlobClient blobClient; void storeInDatabase(); std::string generateHolder(); std::string generateLogID(); void initializePutReactor(); void storeInBlob(const std::string &data) { } public: using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; void terminateCallback() override; }; void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->generateLogID(), (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateHolder() { // TODO replace mock return generateRandomString(); } std::string SendLogReactor::generateLogID() { // TODO replace mock return generateRandomString(); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = this->generateHolder(); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobDoneCVMutex); - this->blobDoneCV.wait(lock2); + if (!this->putReactor->isDone()) { + this->blobDoneCV.wait(lock2); + } else if (!this->putReactor->getStatus().ok()) { + throw std::runtime_error(this->putReactor->getStatus().error_message()); + } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm