diff --git a/services/backup/src/BackupServiceImpl.cpp b/services/backup/src/BackupServiceImpl.cpp --- a/services/backup/src/BackupServiceImpl.cpp +++ b/services/backup/src/BackupServiceImpl.cpp @@ -1,7 +1,7 @@ #include "BackupServiceImpl.h" +#include "AddAttachmentsReactor.h" #include "CreateNewBackupReactor.h" -#include "DatabaseManager.h" #include "PullBackupReactor.h" #include "RecoverBackupKeyReactor.h" #include "SendLogReactor.h" @@ -52,40 +52,8 @@ grpc::CallbackServerContext *context, const backup::AddAttachmentsRequest *request, google::protobuf::Empty *response) { - grpc::Status status = grpc::Status::OK; - std::string userID = request->userid(); - std::string backupID = request->backupid(); - std::string logID = request->logid(); - const std::string holders = request->holders(); - try { - if (userID.empty()) { - throw std::runtime_error("user id required but not provided"); - } - if (backupID.empty()) { - throw std::runtime_error("backup id required but not provided"); - } - if (holders.empty()) { - throw std::runtime_error("holders required but not provided"); - } - - if (logID.empty()) { - // add these attachments to backup - std::shared_ptr backupItem = - database::DatabaseManager::getInstance().findBackupItem( - userID, backupID); - backupItem->addAttachmentHolders(holders); - database::DatabaseManager::getInstance().putBackupItem(*backupItem); - } else { - // add these attachments to log - std::shared_ptr logItem = - database::DatabaseManager::getInstance().findLogItem(backupID, logID); - logItem->addAttachmentHolders(holders); - database::DatabaseManager::getInstance().putLogItem(*logItem); - } - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } + grpc::Status status = + reactor::AddAttachmentsReactor().processRequest(request); auto *reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; diff --git a/services/backup/src/Reactors/server/AddAttachmentsReactor.h b/services/backup/src/Reactors/server/AddAttachmentsReactor.h new file mode 100644 --- /dev/null +++ b/services/backup/src/Reactors/server/AddAttachmentsReactor.h @@ -0,0 +1,23 @@ +#pragma once + +#include "LogItem.h" + +#include "../_generated/backup.grpc.pb.h" +#include "../_generated/backup.pb.h" + +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +class AddAttachmentsReactor { + void moveToS3(std::shared_ptr logItem); +public: + grpc::Status processRequest(const backup::AddAttachmentsRequest *request); +}; + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/backup/src/Reactors/server/AddAttachmentsReactor.cpp b/services/backup/src/Reactors/server/AddAttachmentsReactor.cpp new file mode 100644 --- /dev/null +++ b/services/backup/src/Reactors/server/AddAttachmentsReactor.cpp @@ -0,0 +1,91 @@ +#include "AddAttachmentsReactor.h" + +#include "BackupItem.h" +#include "BlobPutClientReactor.h" +#include "Constants.h" +#include "DatabaseManager.h" +#include "ServiceBlobClient.h" +#include "Tools.h" + +namespace comm { +namespace network { +namespace reactor { + +grpc::Status AddAttachmentsReactor::processRequest( + const backup::AddAttachmentsRequest *request) { + grpc::Status status = grpc::Status::OK; + std::string userID = request->userid(); + std::string backupID = request->backupid(); + std::string logID = request->logid(); + const std::string holders = request->holders(); + try { + if (userID.empty()) { + throw std::runtime_error("user id required but not provided"); + } + if (backupID.empty()) { + throw std::runtime_error("backup id required but not provided"); + } + if (holders.empty()) { + throw std::runtime_error("holders required but not provided"); + } + + if (logID.empty()) { + // add these attachments to backup + std::shared_ptr backupItem = + database::DatabaseManager::getInstance().findBackupItem( + userID, backupID); + backupItem->addAttachmentHolders(holders); + database::DatabaseManager::getInstance().putBackupItem(*backupItem); + } else { + // add these attachments to log + std::shared_ptr logItem = + database::DatabaseManager::getInstance().findLogItem(backupID, logID); + logItem->addAttachmentHolders(holders); + if (!logItem->getPersistedInBlob() && + database::LogItem::getItemSize(logItem.get()) > + LOG_DATA_SIZE_DATABASE_LIMIT) { + this->moveToS3(logItem); + } + database::DatabaseManager::getInstance().putLogItem(*logItem); + } + } catch (std::runtime_error &e) { + std::cout << "error: " << e.what() << std::endl; + status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } +} + +void AddAttachmentsReactor::moveToS3( + std::shared_ptr logItem) { + std::string holder = tools::generateHolder( + logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); + std::string data = std::move(logItem->getValue()); + logItem = std::make_shared( + logItem->getBackupID(), + logItem->getLogID(), + true, + holder, + logItem->getAttachmentHolders(), + logItem->getDataHash()); + // put into S3 + std::condition_variable blobPutDoneCV; + std::mutex blobPutDoneCVMutex; + std::shared_ptr putReactor = + std::make_shared( + holder, logItem->getDataHash(), &blobPutDoneCV); + ServiceBlobClient().put(putReactor); + std::unique_lock lockPut(blobPutDoneCVMutex); + putReactor->scheduleSendingDataChunk( + std::make_unique(std::move(data))); + putReactor->scheduleSendingDataChunk(std::make_unique("")); + if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) { + blobPutDoneCV.wait(lockPut); + } + if (!putReactor->getStatusHolder()->getStatus().ok()) { + throw std::runtime_error( + putReactor->getStatusHolder()->getStatus().error_message()); + } +} + +} // namespace reactor +} // namespace network +} // namespace comm