diff --git a/services/backup/src/BackupServiceImpl.cpp b/services/backup/src/BackupServiceImpl.cpp index 56f5178cb..228440cc1 100644 --- a/services/backup/src/BackupServiceImpl.cpp +++ b/services/backup/src/BackupServiceImpl.cpp @@ -1,95 +1,63 @@ #include "BackupServiceImpl.h" +#include "AddAttachmentsUtility.h" #include "CreateNewBackupReactor.h" -#include "DatabaseManager.h" #include "PullBackupReactor.h" #include "RecoverBackupKeyReactor.h" #include "SendLogReactor.h" #include namespace comm { namespace network { BackupServiceImpl::BackupServiceImpl() { Aws::InitAPI({}); } BackupServiceImpl::~BackupServiceImpl() { Aws::ShutdownAPI({}); } grpc::ServerBidiReactor< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { return new reactor::CreateNewBackupReactor(); } grpc::ServerReadReactor *BackupServiceImpl::SendLog( grpc::CallbackServerContext *context, backup::SendLogResponse *response) { return new reactor::SendLogReactor(response); } grpc::ServerBidiReactor< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { return new reactor::RecoverBackupKeyReactor(); } grpc::ServerWriteReactor * BackupServiceImpl::PullBackup( grpc::CallbackServerContext *context, const backup::PullBackupRequest *request) { reactor::PullBackupReactor *reactor = new reactor::PullBackupReactor(request); reactor->start(); return reactor; } grpc::ServerUnaryReactor *BackupServiceImpl::AddAttachments( grpc::CallbackServerContext *context, const backup::AddAttachmentsRequest *request, google::protobuf::Empty *response) { - grpc::Status status = grpc::Status::OK; - std::string userID = request->userid(); - std::string backupID = request->backupid(); - std::string logID = request->logid(); - const std::string holders = request->holders(); - try { - if (userID.empty()) { - throw std::runtime_error("user id required but not provided"); - } - if (backupID.empty()) { - throw std::runtime_error("backup id required but not provided"); - } - if (holders.empty()) { - throw std::runtime_error("holders required but not provided"); - } - - if (logID.empty()) { - // add these attachments to backup - std::shared_ptr backupItem = - database::DatabaseManager::getInstance().findBackupItem( - userID, backupID); - backupItem->addAttachmentHolders(holders); - database::DatabaseManager::getInstance().putBackupItem(*backupItem); - } else { - // add these attachments to log - std::shared_ptr logItem = - database::DatabaseManager::getInstance().findLogItem(backupID, logID); - logItem->addAttachmentHolders(holders); - database::DatabaseManager::getInstance().putLogItem(*logItem); - } - } catch (std::runtime_error &e) { - std::cout << "error: " << e.what() << std::endl; - status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); - } + grpc::Status status = + reactor::AddAttachmentsUtility().processRequest(request); auto *reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; } } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp new file mode 100644 index 000000000..157b06316 --- /dev/null +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -0,0 +1,94 @@ +#include "AddAttachmentsUtility.h" + +#include "BackupItem.h" +#include "BlobPutClientReactor.h" +#include "Constants.h" +#include "DatabaseManager.h" +#include "ServiceBlobClient.h" +#include "Tools.h" + +namespace comm { +namespace network { +namespace reactor { + +grpc::Status AddAttachmentsUtility::processRequest( + const backup::AddAttachmentsRequest *request) { + grpc::Status status = grpc::Status::OK; + std::string userID = request->userid(); + std::string backupID = request->backupid(); + std::string logID = request->logid(); + const std::string holders = request->holders(); + try { + if (userID.empty()) { + throw std::runtime_error("user id required but not provided"); + } + if (backupID.empty()) { + throw std::runtime_error("backup id required but not provided"); + } + if (holders.empty()) { + throw std::runtime_error("holders required but not provided"); + } + + if (logID.empty()) { + // add these attachments to backup + std::shared_ptr backupItem = + database::DatabaseManager::getInstance().findBackupItem( + userID, backupID); + backupItem->addAttachmentHolders(holders); + database::DatabaseManager::getInstance().putBackupItem(*backupItem); + } else { + // add these attachments to log + std::shared_ptr logItem = + database::DatabaseManager::getInstance().findLogItem(backupID, logID); + logItem->addAttachmentHolders(holders); + if (!logItem->getPersistedInBlob() && + database::LogItem::getItemSize(logItem.get()) > + LOG_DATA_SIZE_DATABASE_LIMIT) { + bool old = logItem->getPersistedInBlob(); + logItem = this->moveToS3(logItem); + } + database::DatabaseManager::getInstance().putLogItem(*logItem); + } + } catch (std::runtime_error &e) { + std::cout << "error: " << e.what() << std::endl; + status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + return status; +} + +std::shared_ptr AddAttachmentsUtility::moveToS3( + std::shared_ptr logItem) { + std::string holder = tools::generateHolder( + logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); + std::string data = std::move(logItem->getValue()); + std::shared_ptr newLogItem = std::make_shared( + logItem->getBackupID(), + logItem->getLogID(), + true, + holder, + logItem->getAttachmentHolders(), + logItem->getDataHash()); + // put into S3 + std::condition_variable blobPutDoneCV; + std::mutex blobPutDoneCVMutex; + std::shared_ptr putReactor = + std::make_shared( + holder, newLogItem->getDataHash(), &blobPutDoneCV); + ServiceBlobClient().put(putReactor); + std::unique_lock lockPut(blobPutDoneCVMutex); + putReactor->scheduleSendingDataChunk( + std::make_unique(std::move(data))); + putReactor->scheduleSendingDataChunk(std::make_unique("")); + if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) { + blobPutDoneCV.wait(lockPut); + } + if (!putReactor->getStatusHolder()->getStatus().ok()) { + throw std::runtime_error( + putReactor->getStatusHolder()->getStatus().error_message()); + } + return newLogItem; +} + +} // namespace reactor +} // namespace network +} // namespace comm diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.h b/services/backup/src/Reactors/server/AddAttachmentsUtility.h new file mode 100644 index 000000000..e9f58c40c --- /dev/null +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.h @@ -0,0 +1,23 @@ +#pragma once + +#include "LogItem.h" + +#include "../_generated/backup.grpc.pb.h" +#include "../_generated/backup.pb.h" + +#include +#include + +namespace comm { +namespace network { +namespace reactor { + +class AddAttachmentsUtility { + std::shared_ptr moveToS3(std::shared_ptr logItem); +public: + grpc::Status processRequest(const backup::AddAttachmentsRequest *request); +}; + +} // namespace reactor +} // namespace network +} // namespace comm