diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h index 93532741d..1c025f709 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/CreateNewBackupReactor.h @@ -1,100 +1,110 @@ #pragma once #include "ServerBidiReactorBase.h" #include "ServiceBlobClient.h" #include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class CreateNewBackupReactor : public ServerBidiReactorBase< backup::CreateNewBackupRequest, backup::CreateNewBackupResponse> { enum class State { - KEY_ENTROPY = 1, - DATA_HASH = 2, - DATA_CHUNKS = 3, + USER_ID = 1, + KEY_ENTROPY = 2, + DATA_HASH = 3, + DATA_CHUNKS = 4, }; - State state = State::KEY_ENTROPY; + State state = State::USER_ID; + std::string userID; std::string keyEntropy; std::string dataHash; std::string backupID; std::shared_ptr putReactor; ServiceBlobClient blobClient; std::mutex reactorStateMutex; std::condition_variable blobDoneCV; std::mutex blobDoneCVMutex; std::string generateBackupID(); public: std::unique_ptr handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) override; void doneCallback(); }; std::string CreateNewBackupReactor::generateBackupID() { // mock return generateRandomString(); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { + case State::USER_ID: { + if (!request.has_userid()) { + throw std::runtime_error("user id expected but not received"); + } + this->userID = request.userid(); + this->state = State::KEY_ENTROPY; + return nullptr; + } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; // TODO confirm - holder may be a backup id this->backupID = this->generateBackupID(); this->putReactor = std::make_shared( this->backupID, this->dataHash, &this->blobDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::doneCallback() { const std::lock_guard lock(this->reactorStateMutex); this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobDoneCVMutex); this->blobDoneCV.wait(lock2); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h index 259f5c97a..692330271 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h @@ -1,45 +1,59 @@ #pragma once #include "ServerReadReactorBase.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, google::protobuf::Empty> { + enum class State { + USER_ID = 1, + LOG_CHUNK = 2, + }; + + State state = State::USER_ID; + std::string userID; public: using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; }; std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { - // TODO implement - std::cout << "handle request log chunk " << request.logdata().size() - << std::endl; - return nullptr; + switch (this->state) { + case State::USER_ID: { + if (!request.has_userid()) { + throw std::runtime_error("user id expected but not received"); + } + this->userID = request.userid(); + this->state = State::LOG_CHUNK; + return nullptr; + }; + } + throw std::runtime_error("send log - invalid state"); } void SendLogReactor::doneCallback() { // TODO implement std::cout << "receive logs done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm