diff --git a/native/cpp/CommonCpp/grpc/protos/backup.proto b/native/cpp/CommonCpp/grpc/protos/backup.proto index e6c151ef1..82b59abcc 100644 --- a/native/cpp/CommonCpp/grpc/protos/backup.proto +++ b/native/cpp/CommonCpp/grpc/protos/backup.proto @@ -1,76 +1,77 @@ syntax = "proto3"; package backup; import "google/protobuf/empty.proto"; /** * API - description * CreateNewBackup - This method is called when we want to create a new backup. * We send a new backup key encrypted with the user's password and also the * new compaction. New logs that will be sent from now on will be assigned to * this backup. * SendLog - User sends a new log to the backup service. The log is being * assigned to the latest(or desired) backup's compaction item. * RecoverBackupKey - Pulls data necessary for regenerating the backup key * on the client-side for the latest(or desired) backup * PullBackup - Fetches compaction + all logs assigned to it for the * specified backup(default is the last backup) */ service BackupService { rpc CreateNewBackup(stream CreateNewBackupRequest) returns (stream CreateNewBackupResponse) {} rpc SendLog(stream SendLogRequest) returns (google.protobuf.Empty) {} rpc RecoverBackupKey(stream RecoverBackupKeyRequest) returns (stream RecoverBackupKeyResponse) {} rpc PullBackup(PullBackupRequest) returns (stream PullBackupResponse) {} } // CreateNewBackup message CreateNewBackupRequest { oneof data { string userID = 1; - bytes keyEntropy = 2; - bytes newCompactionHash = 3; - bytes newCompactionChunk = 4; + string deviceID = 2; + bytes keyEntropy = 3; + bytes newCompactionHash = 4; + bytes newCompactionChunk = 5; } } message CreateNewBackupResponse { string backupID = 1; } // SendLog message SendLogRequest { oneof data { string userID = 1; string backupID = 2; bytes logHash = 3; bytes logData = 4; } } // RecoverBackupKey message RecoverBackupKeyRequest { string userID = 1; } message RecoverBackupKeyResponse { string backupID = 4; } // PullBackup message PullBackupRequest { string userID = 1; string backupID = 2; } message PullBackupResponse { oneof data { bytes compactionChunk = 1; bytes logChunk = 2; } } diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index fafeed911..5a7a3fab8 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,101 +1,109 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); + this->state = State::DEVICE_ID; + return nullptr; + } + case State::DEVICE_ID: { + if (!request.has_deviceid()) { + throw std::runtime_error("device id expected but not received"); + } + this->deviceID = request.deviceid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); response->set_backupid(this->backupID); this->holder = this->backupID; this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state == ReactorState::DONE && !this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lock2); } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } try { // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, getCurrentTimestamp(), generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } catch (std::runtime_error &e) { std::cout << "db operations error: " << e.what() << std::endl; } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/src/Reactors/server/CreateNewBackupReactor.h index e45a39698..e3d312587 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.h @@ -1,54 +1,56 @@ #pragma once #include "ServiceBlobClient.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include "ServerBidiReactorBase.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { USER_ID = 1, - KEY_ENTROPY = 2, - DATA_HASH = 3, - DATA_CHUNKS = 4, + DEVICE_ID = 2, + KEY_ENTROPY = 3, + DATA_HASH = 4, + DATA_CHUNKS = 5, }; State state = State::USER_ID; std::string userID; + std::string deviceID; std::string keyEntropy; std::string dataHash; std::string holder; std::string backupID; std::shared_ptr putReactor; ServiceBlobClient blobClient; std::mutex reactorStateMutex; std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void terminateCallback() override; }; } // namespace reactor } // namespace network } // namespace comm