diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h --- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h @@ -2,6 +2,8 @@ #include "Constants.h" #include "ServerReadReactorBase.h" +#include "ServiceBlobClient.h" +#include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" @@ -19,7 +21,9 @@ google::protobuf::Empty> { enum class State { USER_ID = 1, - LOG_CHUNK = 2, + BACKUP_ID = 2, + LOG_HASH = 3, + LOG_CHUNK = 4, }; enum class PersistenceMethod { @@ -31,12 +35,23 @@ State state = State::USER_ID; PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; + std::string backupID; + std::string hash; + // either the value itself which is a dump of a single operation (if + // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is + // true) + std::string value; + std::mutex reactorStateMutex; + std::condition_variable waitingForBlobClientCV; + std::mutex waitingForBlobClientCVMutex; - void storeInDatabase(const std::string &data) { - } + std::shared_ptr putReactor; + ServiceBlobClient blobClient; - void storeInBlob(const std::string &data) { - } + void storeInDatabase(); + std::string generateHolder(); + std::string generateLogID(); + void initializePutReactor(); public: using ServerReadReactorBase:: @@ -45,16 +60,68 @@ std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; + void terminateCallback() override; }; +void SendLogReactor::storeInDatabase() { + // TODO handle attachment holders + database::LogItem logItem( + this->backupID, + this->generateLogID(), + (this->persistenceMethod == PersistenceMethod::BLOB), + this->value, + {}); + database::DatabaseManager::getInstance().putLogItem(logItem); +} + +std::string SendLogReactor::generateHolder() { + // TODO replace mock + return generateRandomString(); +} + +std::string SendLogReactor::generateLogID() { + // TODO replace mock + return generateRandomString(); +} + +void SendLogReactor::initializePutReactor() { + if (this->value.empty() || this->hash.empty()) { + throw std::runtime_error("not enough data to initialize put reactor"); + } + if (this->putReactor == nullptr) { + this->putReactor = std::make_shared( + this->value, this->hash, &this->waitingForBlobClientCV); + this->blobClient.put(this->putReactor); + } +} + std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { + // we make sure that the blob client's state is flushed to the main memory + // as there may be multiple threads from the pool taking over here + const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); + this->state = State::BACKUP_ID; + return nullptr; + }; + case State::BACKUP_ID: { + if (!request.has_backupid()) { + throw std::runtime_error("backup id expected but not received"); + } + this->backupID = request.backupid(); + this->state = State::LOG_HASH; + return nullptr; + }; + case State::LOG_HASH: { + if (!request.has_loghash()) { + throw std::runtime_error("log hash expected but not received"); + } + this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; @@ -62,18 +129,21 @@ if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } - if (this->persistenceMethod == PersistenceMethod::DB) { - throw std::runtime_error( - "storing multiple chunks in the database is not allowed"); + std::unique_ptr chunk = + std::make_unique(std::move(*request.mutable_logdata())); + if (chunk->size() == 0) { + return std::make_unique(grpc::Status::OK); } - std::string *chunk = request.mutable_logdata(); // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; - this->storeInDatabase(*chunk); + this->value = std::move(*chunk); + this->storeInDatabase(); + return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { - this->storeInBlob(*chunk); + this->initializePutReactor(); + this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } throw std::runtime_error( "error - invalid persistence state for chunk smaller than database " @@ -85,17 +155,38 @@ "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } - this->persistenceMethod = PersistenceMethod::BLOB; - this->storeInBlob(*chunk); + if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { + this->persistenceMethod = PersistenceMethod::BLOB; + } + if (this->value.empty()) { + this->value = this->generateHolder(); + } + this->initializePutReactor(); + this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } - std::cout << "log data received " << chunk->size() << std::endl; + return nullptr; }; } throw std::runtime_error("send log - invalid state"); } +void SendLogReactor::terminateCallback() { + const std::lock_guard lock(this->reactorStateMutex); + + if (this->persistenceMethod == PersistenceMethod::DB || + this->putReactor == nullptr) { + return; + } + this->putReactor->scheduleSendingDataChunk(std::make_unique("")); + std::unique_lock lock2(this->waitingForBlobClientCVMutex); + this->waitingForBlobClientCV.wait(lock2); + // store in db only when we successfully upload chunks + this->storeInDatabase(); +} + void SendLogReactor::doneCallback() { + const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->status.error_code() << "/" << this->status.error_message() << std::endl;