Page MenuHomePhabricator

D3586.id11039.diff
No OneTemporary

D3586.id11039.diff

diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h
@@ -2,6 +2,8 @@
#include "Constants.h"
#include "ServerReadReactorBase.h"
+#include "ServiceBlobClient.h"
+#include "Tools.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
@@ -19,7 +21,9 @@
google::protobuf::Empty> {
enum class State {
USER_ID = 1,
- LOG_CHUNK = 2,
+ BACKUP_ID = 2,
+ LOG_HASH = 3,
+ LOG_CHUNK = 4,
};
enum class PersistenceMethod {
@@ -31,12 +35,23 @@
State state = State::USER_ID;
PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN;
std::string userID;
+ std::string backupID;
+ std::string hash;
+ // either the value itself which is a dump of a single operation (if
+ // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is
+ // true)
+ std::string value;
+ std::mutex reactorStateMutex;
+ std::condition_variable waitingForBlobClientCV;
+ std::mutex waitingForBlobClientCVMutex;
- void storeInDatabase(const std::string &data) {
- }
+ std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
+ ServiceBlobClient blobClient;
- void storeInBlob(const std::string &data) {
- }
+ void storeInDatabase();
+ std::string generateHolder();
+ std::string generateLogID();
+ void initializePutReactor();
public:
using ServerReadReactorBase<backup::SendLogRequest, google::protobuf::Empty>::
@@ -45,16 +60,68 @@
std::unique_ptr<grpc::Status>
readRequest(backup::SendLogRequest request) override;
void doneCallback() override;
+ void terminateCallback() override;
};
+void SendLogReactor::storeInDatabase() {
+ // TODO handle attachment holders
+ database::LogItem logItem(
+ this->backupID,
+ this->generateLogID(),
+ (this->persistenceMethod == PersistenceMethod::BLOB),
+ this->value,
+ {});
+ database::DatabaseManager::getInstance().putLogItem(logItem);
+}
+
+std::string SendLogReactor::generateHolder() {
+ // TODO replace mock
+ return generateRandomString();
+}
+
+std::string SendLogReactor::generateLogID() {
+ // TODO replace mock
+ return generateRandomString();
+}
+
+void SendLogReactor::initializePutReactor() {
+ if (this->value.empty() || this->hash.empty()) {
+ throw std::runtime_error("not enough data to initialize put reactor");
+ }
+ if (this->putReactor == nullptr) {
+ this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
+ this->value, this->hash, &this->waitingForBlobClientCV);
+ this->blobClient.put(this->putReactor);
+ }
+}
+
std::unique_ptr<grpc::Status>
SendLogReactor::readRequest(backup::SendLogRequest request) {
+ // we make sure that the blob client's state is flushed to the main memory
+ // as there may be multiple threads from the pool taking over here
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
switch (this->state) {
case State::USER_ID: {
if (!request.has_userid()) {
throw std::runtime_error("user id expected but not received");
}
this->userID = request.userid();
+ this->state = State::BACKUP_ID;
+ return nullptr;
+ };
+ case State::BACKUP_ID: {
+ if (!request.has_backupid()) {
+ throw std::runtime_error("backup id expected but not received");
+ }
+ this->backupID = request.backupid();
+ this->state = State::LOG_HASH;
+ return nullptr;
+ };
+ case State::LOG_HASH: {
+ if (!request.has_loghash()) {
+ throw std::runtime_error("log hash expected but not received");
+ }
+ this->hash = request.loghash();
this->state = State::LOG_CHUNK;
return nullptr;
};
@@ -62,18 +129,21 @@
if (!request.has_logdata()) {
throw std::runtime_error("log data expected but not received");
}
- if (this->persistenceMethod == PersistenceMethod::DB) {
- throw std::runtime_error(
- "storing multiple chunks in the database is not allowed");
+ std::unique_ptr<std::string> chunk =
+ std::make_unique<std::string>(std::move(*request.mutable_logdata()));
+ if (chunk->size() == 0) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
}
- std::string *chunk = request.mutable_logdata();
// decide if keep in DB or upload to blob
if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) {
if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
this->persistenceMethod = PersistenceMethod::DB;
- this->storeInDatabase(*chunk);
+ this->value = std::move(*chunk);
+ this->storeInDatabase();
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
} else if (this->persistenceMethod == PersistenceMethod::BLOB) {
- this->storeInBlob(*chunk);
+ this->initializePutReactor();
+ this->putReactor->scheduleSendingDataChunk(std::move(chunk));
}
throw std::runtime_error(
"error - invalid persistence state for chunk smaller than database "
@@ -85,17 +155,38 @@
"error - invalid persistence state, uploading to blob should be "
"continued but it is not");
}
- this->persistenceMethod = PersistenceMethod::BLOB;
- this->storeInBlob(*chunk);
+ if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
+ this->persistenceMethod = PersistenceMethod::BLOB;
+ }
+ if (this->value.empty()) {
+ this->value = this->generateHolder();
+ }
+ this->initializePutReactor();
+ this->putReactor->scheduleSendingDataChunk(std::move(chunk));
}
- std::cout << "log data received " << chunk->size() << std::endl;
+
return nullptr;
};
}
throw std::runtime_error("send log - invalid state");
}
+void SendLogReactor::terminateCallback() {
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
+
+ if (this->persistenceMethod == PersistenceMethod::DB ||
+ this->putReactor == nullptr) {
+ return;
+ }
+ this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
+ std::unique_lock<std::mutex> lock2(this->waitingForBlobClientCVMutex);
+ this->waitingForBlobClientCV.wait(lock2);
+ // store in db only when we successfully upload chunks
+ this->storeInDatabase();
+}
+
void SendLogReactor::doneCallback() {
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
// TODO implement
std::cout << "receive logs done " << this->status.error_code() << "/"
<< this->status.error_message() << std::endl;

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 26, 5:22 PM (10 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2707722
Default Alt Text
D3586.id11039.diff (6 KB)

Event Timeline