diff --git a/services/backup/src/DatabaseEntities/LogItem.cpp b/services/backup/src/DatabaseEntities/LogItem.cpp index 1cbaa3430..e9b59e58e 100644 --- a/services/backup/src/DatabaseEntities/LogItem.cpp +++ b/services/backup/src/DatabaseEntities/LogItem.cpp @@ -1,115 +1,148 @@ #include "LogItem.h" #include "Constants.h" #include "Tools.h" #include namespace comm { namespace network { namespace database { const std::string LogItem::FIELD_BACKUP_ID = "backupID"; const std::string LogItem::FIELD_LOG_ID = "logID"; const std::string LogItem::FIELD_PERSISTED_IN_BLOB = "persistedInBlob"; const std::string LogItem::FIELD_VALUE = "value"; const std::string LogItem::FIELD_ATTACHMENT_HOLDERS = "attachmentHolders"; +const std::string LogItem::FIELD_DATA_HASH = "dataHash"; std::string LogItem::tableName = LOG_TABLE_NAME; LogItem::LogItem( const std::string backupID, const std::string logID, const bool persistedInBlob, const std::string value, - std::string attachmentHolders) + std::string attachmentHolders, + const std::string dataHash) : backupID(backupID), logID(logID), persistedInBlob(persistedInBlob), value(value), - attachmentHolders(attachmentHolders) { + attachmentHolders(attachmentHolders), + dataHash(dataHash) { this->validate(); } LogItem::LogItem(const AttributeValues &itemFromDB) { this->assignItemFromDatabase(itemFromDB); } void LogItem::validate() const { if (!this->backupID.size()) { throw std::runtime_error("backupID empty"); } if (!this->logID.size()) { throw std::runtime_error("logID empty"); } if (!this->value.size()) { throw std::runtime_error("value empty"); } - if (!this->persistedInBlob && - this->value.size() > LOG_DATA_SIZE_DATABASE_LIMIT) { + const size_t itemSize = LogItem::getItemSize(this); + if (!this->persistedInBlob && itemSize > LOG_DATA_SIZE_DATABASE_LIMIT) { throw std::runtime_error( "the value of this log is too big to be stored in the database, it " - "should be stored in the blob instead"); + "should be stored in the blob instead (" + + std::to_string(itemSize) + "/" + + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + ")"); + } + if (!this->dataHash.size()) { + throw std::runtime_error("data hash empty"); } } void LogItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { try { this->backupID = itemFromDB.at(LogItem::FIELD_BACKUP_ID).GetS(); this->logID = itemFromDB.at(LogItem::FIELD_LOG_ID).GetS(); this->persistedInBlob = std::stoi( std::string(itemFromDB.at(LogItem::FIELD_PERSISTED_IN_BLOB).GetS()) .c_str()); this->value = itemFromDB.at(LogItem::FIELD_VALUE).GetS(); auto attachmentsHolders = itemFromDB.find(LogItem::FIELD_ATTACHMENT_HOLDERS); if (attachmentsHolders != itemFromDB.end()) { this->attachmentHolders = attachmentsHolders->second.GetS(); } + this->dataHash = itemFromDB.at(LogItem::FIELD_DATA_HASH).GetS(); } catch (std::logic_error &e) { throw std::runtime_error( "invalid log item provided, " + std::string(e.what())); } this->validate(); } std::string LogItem::getTableName() const { return LogItem::tableName; } PrimaryKeyDescriptor LogItem::getPrimaryKeyDescriptor() const { return PrimaryKeyDescriptor(LogItem::FIELD_BACKUP_ID, LogItem::FIELD_LOG_ID); } PrimaryKeyValue LogItem::getPrimaryKeyValue() const { return PrimaryKeyValue(this->backupID, this->logID); } std::string LogItem::getBackupID() const { return this->backupID; } std::string LogItem::getLogID() const { return this->logID; } bool LogItem::getPersistedInBlob() const { return this->persistedInBlob; } std::string LogItem::getValue() const { return this->value; } std::string LogItem::getAttachmentHolders() const { return this->attachmentHolders; } +std::string LogItem::getDataHash() const { + return this->dataHash; +} + void LogItem::addAttachmentHolders(const std::string &attachmentHolders) { this->attachmentHolders += tools::validateAttachmentHolders(attachmentHolders); } +size_t LogItem::getItemSize(const LogItem *item) { + size_t size = 0; + + size += LogItem::FIELD_BACKUP_ID.size(); + size += LogItem::FIELD_LOG_ID.size(); + size += LogItem::FIELD_PERSISTED_IN_BLOB.size(); + size += LogItem::FIELD_VALUE.size(); + size += LogItem::FIELD_ATTACHMENT_HOLDERS.size(); + size += LogItem::FIELD_DATA_HASH.size(); + + size += item->getBackupID().size(); + size += item->getLogID().size(); + size += std::to_string(item->getPersistedInBlob()).size(); + size += item->getValue().size(); + size += item->getAttachmentHolders().size(); + size += item->getDataHash().size(); + + return size; +} + } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/DatabaseEntities/LogItem.h b/services/backup/src/DatabaseEntities/LogItem.h index f3c443395..4f489ba6f 100644 --- a/services/backup/src/DatabaseEntities/LogItem.h +++ b/services/backup/src/DatabaseEntities/LogItem.h @@ -1,64 +1,70 @@ #pragma once #include "Item.h" #include namespace comm { namespace network { namespace database { /* * log - a single log record * `backupID` - id of the backup that this log is assigned to * `value` - either the value itself which is a dump of a single operation (if * `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is * true) * `attachmentHolders` - this is a list of attachment references */ class LogItem : public Item { std::string backupID; std::string logID; bool persistedInBlob; std::string value; std::string attachmentHolders; + std::string dataHash; void validate() const override; public: static std::string tableName; static const std::string FIELD_BACKUP_ID; static const std::string FIELD_LOG_ID; static const std::string FIELD_PERSISTED_IN_BLOB; static const std::string FIELD_VALUE; static const std::string FIELD_ATTACHMENT_HOLDERS; + static const std::string FIELD_DATA_HASH; LogItem() { } LogItem( const std::string backupID, const std::string logID, const bool persistedInBlob, const std::string value, - std::string attachmentHolders); + std::string attachmentHolders, + const std::string dataHash); LogItem(const AttributeValues &itemFromDB); void assignItemFromDatabase(const AttributeValues &itemFromDB) override; std::string getTableName() const override; PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override; PrimaryKeyValue getPrimaryKeyValue() const override; std::string getBackupID() const; std::string getLogID() const; bool getPersistedInBlob() const; std::string getValue() const; std::string getAttachmentHolders() const; + std::string getDataHash() const; void addAttachmentHolders(const std::string &attachmentHolders); + + static size_t getItemSize(const LogItem *item); }; } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index e96f14c5b..49cf367c0 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,163 +1,164 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->logID, (this->persistenceMethod == PersistenceMethod::BLOB), this->value, - {}); + {}, + this->hash); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return nullptr; } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = tools::generateHolder(this->hash, this->backupID, this->logID); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->getStatusHolder()->getStatus().error_code() << "/" << this->getStatusHolder()->getStatus().error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm