diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index d13e3be2d..a3026a72d 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,176 +1,164 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { - // TODO handle attachment holders + bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; database::LogItem logItem( this->backupID, this->logID, - (this->persistenceMethod == PersistenceMethod::BLOB), - this->value, + storedInBlob, + storedInBlob ? this->blobHolder : this->value, {}, this->hash); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutReactor() { - if (this->value.empty()) { + if (this->blobHolder.empty()) { throw std::runtime_error( - "put reactor cannot be initialized with empty value"); + "put reactor cannot be initialized with empty blob holder"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( - this->value, this->hash, &this->blobPutDoneCV); + this->blobHolder, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } - // decide if keep in DB or upload to blob - if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::DB; - this->value = std::move(*chunk); - this->storeInDatabase(); - return nullptr; - } else if (this->persistenceMethod == PersistenceMethod::BLOB) { - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } else { + if (this->persistenceMethod == PersistenceMethod::BLOB) { + if (this->putReactor == nullptr) { throw std::runtime_error( - "error - invalid persistence state for chunk smaller than " - "database limit"); + "put reactor is being used but has not been initialized"); } - } else { - if (this->persistenceMethod != PersistenceMethod::UNKNOWN && - this->persistenceMethod != PersistenceMethod::BLOB) { - throw std::runtime_error( - "error - invalid persistence state, uploading to blob should be " - "continued but it is not"); - } - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::BLOB; - } - if (this->value.empty()) { - this->value = - tools::generateHolder(this->hash, this->backupID, this->logID); - } - this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); + return nullptr; + } + this->value += std::move(*chunk); + database::LogItem logItem = database::LogItem( + this->backupID, this->logID, true, this->value, "", this->hash); + if (database::LogItem::getItemSize(&logItem) > + LOG_DATA_SIZE_DATABASE_LIMIT) { + this->persistenceMethod = PersistenceMethod::BLOB; + this->blobHolder = + tools::generateHolder(this->hash, this->backupID, this->logID); + this->initializePutReactor(); + this->putReactor->scheduleSendingDataChunk( + std::make_unique(this->value)); + this->value = ""; } - return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } if (this->persistenceMethod != PersistenceMethod::BLOB && this->persistenceMethod != PersistenceMethod::DB) { throw std::runtime_error("Invalid persistence method detected"); } if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { this->storeInDatabase(); return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); } if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->getStatusHolder()->getStatus().error_code() << "/" << this->getStatusHolder()->getStatus().error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h index 69c3679de..615e7c431 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,66 +1,65 @@ #pragma once +#include "LogItem.h" #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, backup::SendLogResponse> { enum class State { USER_ID = 1, BACKUP_ID = 2, LOG_HASH = 3, LOG_CHUNK = 4, }; enum class PersistenceMethod { UNKNOWN = 0, DB = 1, BLOB = 2, }; State state = State::USER_ID; PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; std::string logID; std::string backupID; std::string hash; - // either the value itself which is a dump of a single operation (if - // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is - // true) + std::string blobHolder; std::string value; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; std::shared_ptr putReactor; ServiceBlobClient blobClient; void storeInDatabase(); std::string generateLogID(const std::string &backupID); void initializePutReactor(); public: using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; void terminateCallback() override; }; } // namespace reactor } // namespace network } // namespace comm