diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,5 +1,6 @@ #pragma once +#include "LogItem.h" #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" @@ -35,11 +36,10 @@ std::string logID; std::string backupID; std::string hash; - // either the value itself which is a dump of a single operation (if - // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is - // true) + std::string blobHolder; std::string value; std::mutex reactorStateMutex; + database::LogItem logItem; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -12,13 +12,20 @@ namespace reactor { void SendLogReactor::storeInDatabase() { - // TODO handle attachment holders database::LogItem logItem( this->backupID, this->logID, (this->persistenceMethod == PersistenceMethod::BLOB), - this->value, - {}); + (this->persistenceMethod == PersistenceMethod::BLOB) ? this->blobHolder + : this->value, + {}, + this->hash); + if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { + throw std::runtime_error( + "trying to put into the database an item that exceeds the limit (" + + std::to_string(database::LogItem::getItemSize(&logItem)) + "/" + + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + ")"); + } database::DatabaseManager::getInstance().putLogItem(logItem); } @@ -28,9 +35,9 @@ } void SendLogReactor::initializePutReactor() { - if (this->value.empty()) { + if (this->blobHolder.empty()) { throw std::runtime_error( - "put reactor cannot be initialized with empty value"); + "put reactor cannot be initialized with empty blob holder"); } if (this->hash.empty()) { throw std::runtime_error( @@ -38,7 +45,7 @@ } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( - this->value, this->hash, &this->blobPutDoneCV); + this->blobHolder, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } @@ -89,39 +96,23 @@ if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } - // decide if keep in DB or upload to blob - if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::DB; - this->value = std::move(*chunk); - this->storeInDatabase(); - return nullptr; - } else if (this->persistenceMethod == PersistenceMethod::BLOB) { - this->initializePutReactor(); - this->putReactor->scheduleSendingDataChunk(std::move(chunk)); - } else { - throw std::runtime_error( - "error - invalid persistence state for chunk smaller than " - "database limit"); - } - } else { - if (this->persistenceMethod != PersistenceMethod::UNKNOWN && - this->persistenceMethod != PersistenceMethod::BLOB) { - throw std::runtime_error( - "error - invalid persistence state, uploading to blob should be " - "continued but it is not"); - } - if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { - this->persistenceMethod = PersistenceMethod::BLOB; - } - if (this->value.empty()) { - this->value = - tools::generateHolder(this->hash, this->backupID, this->logID); - } - this->initializePutReactor(); + if (this->persistenceMethod == PersistenceMethod::BLOB) { this->putReactor->scheduleSendingDataChunk(std::move(chunk)); + return nullptr; + } + this->value += std::move(*chunk); + database::LogItem logItem = database::LogItem( + this->backupID, this->logID, true, this->value, "", this->hash); + if (database::LogItem::getItemSize(&logItem) > + LOG_DATA_SIZE_DATABASE_LIMIT) { + this->persistenceMethod = PersistenceMethod::BLOB; + this->blobHolder = + tools::generateHolder(this->hash, this->backupID, this->logID); + this->initializePutReactor(); + this->putReactor->scheduleSendingDataChunk( + std::make_unique(this->value)); + this->value = ""; } - return nullptr; }; } @@ -131,15 +122,18 @@ void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - if (this->persistenceMethod == PersistenceMethod::DB || + if (this->persistenceMethod != PersistenceMethod::BLOB || this->putReactor == nullptr) { + this->storeInDatabase(); + this->persistenceMethod = PersistenceMethod::DB; return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); - } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { + } + if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); }