diff --git a/native/cpp/CommonCpp/grpc/protos/backup.proto b/native/cpp/CommonCpp/grpc/protos/backup.proto index 82b59abcc..2a2d6c9ce 100644 --- a/native/cpp/CommonCpp/grpc/protos/backup.proto +++ b/native/cpp/CommonCpp/grpc/protos/backup.proto @@ -1,77 +1,81 @@ syntax = "proto3"; package backup; import "google/protobuf/empty.proto"; /** * API - description * CreateNewBackup - This method is called when we want to create a new backup. * We send a new backup key encrypted with the user's password and also the * new compaction. New logs that will be sent from now on will be assigned to * this backup. * SendLog - User sends a new log to the backup service. The log is being * assigned to the latest(or desired) backup's compaction item. * RecoverBackupKey - Pulls data necessary for regenerating the backup key * on the client-side for the latest(or desired) backup * PullBackup - Fetches compaction + all logs assigned to it for the * specified backup(default is the last backup) */ service BackupService { rpc CreateNewBackup(stream CreateNewBackupRequest) returns (stream CreateNewBackupResponse) {} - rpc SendLog(stream SendLogRequest) returns (google.protobuf.Empty) {} + rpc SendLog(stream SendLogRequest) returns (SendLogResponse) {} rpc RecoverBackupKey(stream RecoverBackupKeyRequest) returns (stream RecoverBackupKeyResponse) {} rpc PullBackup(PullBackupRequest) returns (stream PullBackupResponse) {} } // CreateNewBackup message CreateNewBackupRequest { oneof data { string userID = 1; string deviceID = 2; bytes keyEntropy = 3; bytes newCompactionHash = 4; bytes newCompactionChunk = 5; } } message CreateNewBackupResponse { string backupID = 1; } // SendLog message SendLogRequest { oneof data { string userID = 1; string backupID = 2; bytes logHash = 3; bytes logData = 4; } } +message SendLogResponse { + string logCheckpoint = 1; +} + // RecoverBackupKey message RecoverBackupKeyRequest { string userID = 1; } message RecoverBackupKeyResponse { string backupID = 4; } // PullBackup message PullBackupRequest { string userID = 1; string backupID = 2; } message PullBackupResponse { oneof data { bytes compactionChunk = 1; bytes logChunk = 2; } } diff --git a/services/backup/src/BackupServiceImpl.cpp b/services/backup/src/BackupServiceImpl.cpp index 975182f30..c697f91b7 100644 --- a/services/backup/src/BackupServiceImpl.cpp +++ b/services/backup/src/BackupServiceImpl.cpp @@ -1,51 +1,51 @@ #include "BackupServiceImpl.h" #include "CreateNewBackupReactor.h" #include "PullBackupReactor.h" #include "RecoverBackupKeyReactor.h" #include "SendLogReactor.h" #include namespace comm { namespace network { BackupServiceImpl::BackupServiceImpl() { Aws::InitAPI({}); } BackupServiceImpl::~BackupServiceImpl() { Aws::ShutdownAPI({}); } grpc::ServerBidiReactor< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * BackupServiceImpl::CreateNewBackup(grpc::CallbackServerContext *context) { return new reactor::CreateNewBackupReactor(); } grpc::ServerReadReactor *BackupServiceImpl::SendLog( grpc::CallbackServerContext *context, - google::protobuf::Empty *response) { + backup::SendLogResponse *response) { return new reactor::SendLogReactor(response); } grpc::ServerBidiReactor< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * BackupServiceImpl::RecoverBackupKey(grpc::CallbackServerContext *context) { return new reactor::RecoverBackupKeyReactor(); } grpc::ServerWriteReactor * BackupServiceImpl::PullBackup( grpc::CallbackServerContext *context, const backup::PullBackupRequest *request) { reactor::PullBackupReactor *reactor = new reactor::PullBackupReactor(request); reactor->start(); return reactor; } } // namespace network } // namespace comm diff --git a/services/backup/src/BackupServiceImpl.h b/services/backup/src/BackupServiceImpl.h index 683910679..ca1c6fe07 100644 --- a/services/backup/src/BackupServiceImpl.h +++ b/services/backup/src/BackupServiceImpl.h @@ -1,37 +1,37 @@ #pragma once #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include namespace comm { namespace network { class BackupServiceImpl final : public backup::BackupService::CallbackService { public: BackupServiceImpl(); virtual ~BackupServiceImpl(); grpc::ServerBidiReactor< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> * CreateNewBackup(grpc::CallbackServerContext *context) override; grpc::ServerReadReactor *SendLog( grpc::CallbackServerContext *context, - google::protobuf::Empty *response) override; + backup::SendLogResponse *response) override; grpc::ServerBidiReactor< backup::RecoverBackupKeyRequest, backup::RecoverBackupKeyResponse> * RecoverBackupKey(grpc::CallbackServerContext *context) override; grpc::ServerWriteReactor *PullBackup( grpc::CallbackServerContext *context, const backup::PullBackupRequest *request) override; }; } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 2d4e57726..444507212 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,157 +1,158 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->logID, (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->logID = this->generateLogID(this->backupID); + this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { this->value = tools::generateHolder(this->hash, this->backupID, this->logID); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->getStatusHolder()->getStatus().error_code() << "/" << this->getStatusHolder()->getStatus().error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.h b/services/backup/src/Reactors/server/SendLogReactor.h index bcc807c04..69c3679de 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.h +++ b/services/backup/src/Reactors/server/SendLogReactor.h @@ -1,66 +1,66 @@ #pragma once #include "ServerReadReactorBase.h" #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, - google::protobuf::Empty> { + backup::SendLogResponse> { enum class State { USER_ID = 1, BACKUP_ID = 2, LOG_HASH = 3, LOG_CHUNK = 4, }; enum class PersistenceMethod { UNKNOWN = 0, DB = 1, BLOB = 2, }; State state = State::USER_ID; PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; std::string logID; std::string backupID; std::string hash; // either the value itself which is a dump of a single operation (if // `persistedInBlob` is false) or the holder to blob (if `persistedInBlob` is // true) std::string value; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; std::shared_ptr putReactor; ServiceBlobClient blobClient; void storeInDatabase(); std::string generateLogID(const std::string &backupID); void initializePutReactor(); public: - using ServerReadReactorBase:: + using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; void terminateCallback() override; }; } // namespace reactor } // namespace network } // namespace comm