Page MenuHomePhabricator

D4323.id14029.diff
No OneTemporary

D4323.id14029.diff

diff --git a/services/backup/src/BackupServiceImpl.cpp b/services/backup/src/BackupServiceImpl.cpp
--- a/services/backup/src/BackupServiceImpl.cpp
+++ b/services/backup/src/BackupServiceImpl.cpp
@@ -1,7 +1,7 @@
#include "BackupServiceImpl.h"
+#include "AddAttachmentsReactor.h"
#include "CreateNewBackupReactor.h"
-#include "DatabaseManager.h"
#include "PullBackupReactor.h"
#include "RecoverBackupKeyReactor.h"
#include "SendLogReactor.h"
@@ -52,40 +52,8 @@
grpc::CallbackServerContext *context,
const backup::AddAttachmentsRequest *request,
google::protobuf::Empty *response) {
- grpc::Status status = grpc::Status::OK;
- std::string userID = request->userid();
- std::string backupID = request->backupid();
- std::string logID = request->logid();
- const std::string holders = request->holders();
- try {
- if (userID.empty()) {
- throw std::runtime_error("user id required but not provided");
- }
- if (backupID.empty()) {
- throw std::runtime_error("backup id required but not provided");
- }
- if (holders.empty()) {
- throw std::runtime_error("holders required but not provided");
- }
-
- if (logID.empty()) {
- // add these attachments to backup
- std::shared_ptr<database::BackupItem> backupItem =
- database::DatabaseManager::getInstance().findBackupItem(
- userID, backupID);
- backupItem->addAttachmentHolders(holders);
- database::DatabaseManager::getInstance().putBackupItem(*backupItem);
- } else {
- // add these attachments to log
- std::shared_ptr<database::LogItem> logItem =
- database::DatabaseManager::getInstance().findLogItem(backupID, logID);
- logItem->addAttachmentHolders(holders);
- database::DatabaseManager::getInstance().putLogItem(*logItem);
- }
- } catch (std::runtime_error &e) {
- std::cout << "error: " << e.what() << std::endl;
- status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
+ grpc::Status status =
+ reactor::AddAttachmentsReactor().processRequest(request);
auto *reactor = context->DefaultReactor();
reactor->Finish(status);
return reactor;
diff --git a/services/backup/src/Reactors/server/AddAttachmentsReactor.h b/services/backup/src/Reactors/server/AddAttachmentsReactor.h
new file mode 100644
--- /dev/null
+++ b/services/backup/src/Reactors/server/AddAttachmentsReactor.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include "LogItem.h"
+
+#include "../_generated/backup.grpc.pb.h"
+#include "../_generated/backup.pb.h"
+
+#include <grpcpp/grpcpp.h>
+#include <memory>
+
+namespace comm {
+namespace network {
+namespace reactor {
+
+class AddAttachmentsReactor {
+ void moveToS3(std::shared_ptr<database::LogItem> logItem);
+public:
+ grpc::Status processRequest(const backup::AddAttachmentsRequest *request);
+};
+
+} // namespace reactor
+} // namespace network
+} // namespace comm
diff --git a/services/backup/src/Reactors/server/AddAttachmentsReactor.cpp b/services/backup/src/Reactors/server/AddAttachmentsReactor.cpp
new file mode 100644
--- /dev/null
+++ b/services/backup/src/Reactors/server/AddAttachmentsReactor.cpp
@@ -0,0 +1,91 @@
+#include "AddAttachmentsReactor.h"
+
+#include "BackupItem.h"
+#include "BlobPutClientReactor.h"
+#include "Constants.h"
+#include "DatabaseManager.h"
+#include "ServiceBlobClient.h"
+#include "Tools.h"
+
+namespace comm {
+namespace network {
+namespace reactor {
+
+grpc::Status AddAttachmentsReactor::processRequest(
+ const backup::AddAttachmentsRequest *request) {
+ grpc::Status status = grpc::Status::OK;
+ std::string userID = request->userid();
+ std::string backupID = request->backupid();
+ std::string logID = request->logid();
+ const std::string holders = request->holders();
+ try {
+ if (userID.empty()) {
+ throw std::runtime_error("user id required but not provided");
+ }
+ if (backupID.empty()) {
+ throw std::runtime_error("backup id required but not provided");
+ }
+ if (holders.empty()) {
+ throw std::runtime_error("holders required but not provided");
+ }
+
+ if (logID.empty()) {
+ // add these attachments to backup
+ std::shared_ptr<database::BackupItem> backupItem =
+ database::DatabaseManager::getInstance().findBackupItem(
+ userID, backupID);
+ backupItem->addAttachmentHolders(holders);
+ database::DatabaseManager::getInstance().putBackupItem(*backupItem);
+ } else {
+ // add these attachments to log
+ std::shared_ptr<database::LogItem> logItem =
+ database::DatabaseManager::getInstance().findLogItem(backupID, logID);
+ logItem->addAttachmentHolders(holders);
+ if (!logItem->getPersistedInBlob() &&
+ database::LogItem::getItemSize(logItem.get()) >
+ LOG_DATA_SIZE_DATABASE_LIMIT) {
+ this->moveToS3(logItem);
+ }
+ database::DatabaseManager::getInstance().putLogItem(*logItem);
+ }
+ } catch (std::runtime_error &e) {
+ std::cout << "error: " << e.what() << std::endl;
+ status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ }
+}
+
+void AddAttachmentsReactor::moveToS3(
+ std::shared_ptr<database::LogItem> logItem) {
+ std::string holder = tools::generateHolder(
+ logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID());
+ std::string data = std::move(logItem->getValue());
+ logItem = std::make_shared<database::LogItem>(
+ logItem->getBackupID(),
+ logItem->getLogID(),
+ true,
+ holder,
+ logItem->getAttachmentHolders(),
+ logItem->getDataHash());
+ // put into S3
+ std::condition_variable blobPutDoneCV;
+ std::mutex blobPutDoneCVMutex;
+ std::shared_ptr<reactor::BlobPutClientReactor> putReactor =
+ std::make_shared<reactor::BlobPutClientReactor>(
+ holder, logItem->getDataHash(), &blobPutDoneCV);
+ ServiceBlobClient().put(putReactor);
+ std::unique_lock<std::mutex> lockPut(blobPutDoneCVMutex);
+ putReactor->scheduleSendingDataChunk(
+ std::make_unique<std::string>(std::move(data)));
+ putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
+ if (putReactor->getStatusHolder()->state != reactor::ReactorState::DONE) {
+ blobPutDoneCV.wait(lockPut);
+ }
+ if (!putReactor->getStatusHolder()->getStatus().ok()) {
+ throw std::runtime_error(
+ putReactor->getStatusHolder()->getStatus().error_message());
+ }
+}
+
+} // namespace reactor
+} // namespace network
+} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 18, 12:49 PM (20 h, 56 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2534955
Default Alt Text
D4323.id14029.diff (6 KB)

Event Timeline