Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3536065
D3586.id11039.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Referenced Files
None
Subscribers
None
D3586.id11039.diff
View Options
diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h
--- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h
+++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h
@@ -2,6 +2,8 @@
#include "Constants.h"
#include "ServerReadReactorBase.h"
+#include "ServiceBlobClient.h"
+#include "Tools.h"
#include "../_generated/backup.grpc.pb.h"
#include "../_generated/backup.pb.h"
@@ -19,7 +21,9 @@
google::protobuf::Empty> {
enum class State {
USER_ID = 1,
- LOG_CHUNK = 2,
+ BACKUP_ID = 2,
+ LOG_HASH = 3,
+ LOG_CHUNK = 4,
};
enum class PersistenceMethod {
@@ -31,12 +35,23 @@
State state = State::USER_ID;
PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN;
std::string userID;
+ std::string backupID;
+ std::string hash;
+ // either the value itself which is a dump of a single operation (if
+ // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is
+ // true)
+ std::string value;
+ std::mutex reactorStateMutex;
+ std::condition_variable waitingForBlobClientCV;
+ std::mutex waitingForBlobClientCVMutex;
- void storeInDatabase(const std::string &data) {
- }
+ std::shared_ptr<reactor::BlobPutClientReactor> putReactor;
+ ServiceBlobClient blobClient;
- void storeInBlob(const std::string &data) {
- }
+ void storeInDatabase();
+ std::string generateHolder();
+ std::string generateLogID();
+ void initializePutReactor();
public:
using ServerReadReactorBase<backup::SendLogRequest, google::protobuf::Empty>::
@@ -45,16 +60,68 @@
std::unique_ptr<grpc::Status>
readRequest(backup::SendLogRequest request) override;
void doneCallback() override;
+ void terminateCallback() override;
};
+void SendLogReactor::storeInDatabase() {
+ // TODO handle attachment holders
+ database::LogItem logItem(
+ this->backupID,
+ this->generateLogID(),
+ (this->persistenceMethod == PersistenceMethod::BLOB),
+ this->value,
+ {});
+ database::DatabaseManager::getInstance().putLogItem(logItem);
+}
+
+std::string SendLogReactor::generateHolder() {
+ // TODO replace mock
+ return generateRandomString();
+}
+
+std::string SendLogReactor::generateLogID() {
+ // TODO replace mock
+ return generateRandomString();
+}
+
+void SendLogReactor::initializePutReactor() {
+ if (this->value.empty() || this->hash.empty()) {
+ throw std::runtime_error("not enough data to initialize put reactor");
+ }
+ if (this->putReactor == nullptr) {
+ this->putReactor = std::make_shared<reactor::BlobPutClientReactor>(
+ this->value, this->hash, &this->waitingForBlobClientCV);
+ this->blobClient.put(this->putReactor);
+ }
+}
+
std::unique_ptr<grpc::Status>
SendLogReactor::readRequest(backup::SendLogRequest request) {
+ // we make sure that the blob client's state is flushed to the main memory
+ // as there may be multiple threads from the pool taking over here
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
switch (this->state) {
case State::USER_ID: {
if (!request.has_userid()) {
throw std::runtime_error("user id expected but not received");
}
this->userID = request.userid();
+ this->state = State::BACKUP_ID;
+ return nullptr;
+ };
+ case State::BACKUP_ID: {
+ if (!request.has_backupid()) {
+ throw std::runtime_error("backup id expected but not received");
+ }
+ this->backupID = request.backupid();
+ this->state = State::LOG_HASH;
+ return nullptr;
+ };
+ case State::LOG_HASH: {
+ if (!request.has_loghash()) {
+ throw std::runtime_error("log hash expected but not received");
+ }
+ this->hash = request.loghash();
this->state = State::LOG_CHUNK;
return nullptr;
};
@@ -62,18 +129,21 @@
if (!request.has_logdata()) {
throw std::runtime_error("log data expected but not received");
}
- if (this->persistenceMethod == PersistenceMethod::DB) {
- throw std::runtime_error(
- "storing multiple chunks in the database is not allowed");
+ std::unique_ptr<std::string> chunk =
+ std::make_unique<std::string>(std::move(*request.mutable_logdata()));
+ if (chunk->size() == 0) {
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
}
- std::string *chunk = request.mutable_logdata();
// decide if keep in DB or upload to blob
if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) {
if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
this->persistenceMethod = PersistenceMethod::DB;
- this->storeInDatabase(*chunk);
+ this->value = std::move(*chunk);
+ this->storeInDatabase();
+ return std::make_unique<grpc::Status>(grpc::Status::OK);
} else if (this->persistenceMethod == PersistenceMethod::BLOB) {
- this->storeInBlob(*chunk);
+ this->initializePutReactor();
+ this->putReactor->scheduleSendingDataChunk(std::move(chunk));
}
throw std::runtime_error(
"error - invalid persistence state for chunk smaller than database "
@@ -85,17 +155,38 @@
"error - invalid persistence state, uploading to blob should be "
"continued but it is not");
}
- this->persistenceMethod = PersistenceMethod::BLOB;
- this->storeInBlob(*chunk);
+ if (this->persistenceMethod == PersistenceMethod::UNKNOWN) {
+ this->persistenceMethod = PersistenceMethod::BLOB;
+ }
+ if (this->value.empty()) {
+ this->value = this->generateHolder();
+ }
+ this->initializePutReactor();
+ this->putReactor->scheduleSendingDataChunk(std::move(chunk));
}
- std::cout << "log data received " << chunk->size() << std::endl;
+
return nullptr;
};
}
throw std::runtime_error("send log - invalid state");
}
+void SendLogReactor::terminateCallback() {
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
+
+ if (this->persistenceMethod == PersistenceMethod::DB ||
+ this->putReactor == nullptr) {
+ return;
+ }
+ this->putReactor->scheduleSendingDataChunk(std::make_unique<std::string>(""));
+ std::unique_lock<std::mutex> lock2(this->waitingForBlobClientCVMutex);
+ this->waitingForBlobClientCV.wait(lock2);
+ // store in db only when we successfully upload chunks
+ this->storeInDatabase();
+}
+
void SendLogReactor::doneCallback() {
+ const std::lock_guard<std::mutex> lock(this->reactorStateMutex);
// TODO implement
std::cout << "receive logs done " << this->status.error_code() << "/"
<< this->status.error_message() << std::endl;
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 26, 5:22 PM (10 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2707722
Default Alt Text
D3586.id11039.diff (6 KB)
Attached To
Mode
D3586: [services] Backup - Send log - add store in database
Attached
Detach File
Event Timeline
Log In to Comment