Page MenuHomePhabricator

D4321.id13663.diff
No OneTemporary

D4321.id13663.diff

diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h
--- a/services/backup/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/src/Reactors/server/SendLogReactor.h
@@ -1,5 +1,6 @@
#pragma once
+#include "LogItem.h"
#include "ServerReadReactorBase.h"
#include "ServiceBlobClient.h"
@@ -35,11 +36,10 @@
std::string logID;
std::string backupID;
std::string hash;
- // either the value itself which is a dump of a single operation (if
- // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is
- // true)
+ std::string blobHolder;
std::string value;
std::mutex reactorStateMutex;
+ database::LogItem logItem;
std::condition_variable blobPutDoneCV;
std::mutex blobPutDoneCVMutex;
diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp
--- a/services/backup/src/Reactors/server/SendLogReactor.cpp
+++ b/services/backup/src/Reactors/server/SendLogReactor.cpp
@@ -12,14 +12,20 @@
namespace reactor {
void SendLogReactor::storeInDatabase() {
- // TODO handle attachment holders
database::LogItem logItem(
this->backupID,
this->logID,
(this->persistenceMethod == PersistenceMethod::BLOB),
- this->value,
+ (this->persistenceMethod == PersistenceMethod::BLOB) ? this->blobHolder
+ : this->value,
{},
this->hash);
+ if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) {
+ throw std::runtime_error(
+ "trying to put into the database an item that exceeds the limit (" +
+ std::to_string(database::LogItem::getItemSize(&logItem)) + "/" +
+ std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + ")");
+ }
database::DatabaseManager::getInstance().putLogItem(logItem);
}
@@ -29,9 +35,9 @@
}
void SendLogReactor::initializePutReactor() {
- if (this->value.empty()) {
+ if (this->blobHolder.empty()) {
throw std::runtime_error(
- "put reactor cannot be initialized with empty value");
+ "put reactor cannot be initialized with empty blob holder");
}
if (this->hash.empty()) {
throw std::runtime_error(
@@ -39,7 +45,7 @@
}
if (this->putReactor == nullptr) {
this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
- this->value, this->hash, &this->blobPutDoneCV);
+ this->blobHolder, this->hash, &this->blobPutDoneCV);
this->blobClient.put(this->putReactor);
}
}
@@ -90,39 +96,23 @@
if (chunk->size() == 0) {
return std::make_unique<grpc::Status>(grpc::Status::OK);
}
- // decide if keep in DB or upload to blob
- if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) {
- if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
- this->persistenceMethod = PersistenceMethod::DB;
- this->value = std::move(*chunk);
- this->storeInDatabase();
- return nullptr;
- } else if (this->persistenceMethod == PersistenceMethod::BLOB) {
- this->initializePutReactor();
- this->putReactor->scheduleSendingDataChunk(std::move(chunk));
- } else {
- throw std::runtime_error(
- "error - invalid persistence state for chunk smaller than "
- "database limit");
- }
- } else {
- if (this->persistenceMethod != PersistenceMethod::UNKNOWN &&
- this->persistenceMethod != PersistenceMethod::BLOB) {
- throw std::runtime_error(
- "error - invalid persistence state, uploading to blob should be "
- "continued but it is not");
- }
- if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
- this->persistenceMethod = PersistenceMethod::BLOB;
- }
- if (this->value.empty()) {
- this->value =
- tools::generateHolder(this->hash, this->backupID, this->logID);
- }
- this->initializePutReactor();
+ if (this->persistenceMethod == PersistenceMethod::BLOB) {
this->putReactor->scheduleSendingDataChunk(std::move(chunk));
+ return nullptr;
+ }
+ this->value += std::move(*chunk);
+ database::LogItem logItem = database::LogItem(
+ this->backupID, this->logID, true, this->value, "", this->hash);
+ if (database::LogItem::getItemSize(&logItem) >
+ LOG_DATA_SIZE_DATABASE_LIMIT) {
+ this->persistenceMethod = PersistenceMethod::BLOB;
+ this->blobHolder =
+ tools::generateHolder(this->hash, this->backupID, this->logID);
+ this->initializePutReactor();
+ this->putReactor->scheduleSendingDataChunk(
+ std::make_unique<std::string>(this->value));
+ this->value = "";
}
-
return nullptr;
};
}
@@ -132,15 +122,18 @@
void SendLogReactor::terminateCallback() {
const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
- if (this->persistenceMethod == PersistenceMethod::DB ||
+ if (this->persistenceMethod != PersistenceMethod::BLOB ||
this->putReactor == nullptr) {
+ this->persistenceMethod = PersistenceMethod::DB;
+ this->storeInDatabase();
return;
}
this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
std::unique_lock<std::mutex> lockPut(this->blobPutDoneCVMutex);
if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) {
this->blobPutDoneCV.wait(lockPut);
- } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) {
+ }
+ if (!this->putReactor->getStatusHolder()->getStatus().ok()) {
throw std::runtime_error(
this->putReactor->getStatusHolder()->getStatus().error_message());
}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 21, 4:58 AM (18 h, 21 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2685970
Default Alt Text
D4321.id13663.diff (5 KB)

Event Timeline