diff --git a/services/backup/docker-server/contents/server/src/Constants.h b/services/backup/docker-server/contents/server/src/Constants.h index 0d52ac443..84b04092b 100644 --- a/services/backup/docker-server/contents/server/src/Constants.h +++ b/services/backup/docker-server/contents/server/src/Constants.h @@ -1,34 +1,41 @@ #pragma once #include namespace comm { namespace network { // 4MB limit // WARNING: use keeping in mind that grpc adds its own headers to messages // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md // so the message that actually is being sent over the network looks like this // [Compressed-Flag] [Message-Length] [Message] // [Compressed-Flag] 1 byte - added by grpc // [Message-Length] 4 bytes - added by grpc // [Message] N bytes - actual data // so for every message we get 5 additional bytes of data // as mentioned here // https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 // grpc stream may contain more than one message const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024; const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5; const std::string AWS_REGION = "us-east-2"; #ifdef COMM_TEST_SERVICES const std::string LOG_TABLE_NAME = "backup-service-log-test"; const std::string BACKUP_TABLE_NAME = "backup-service-backup-test"; #else const std::string LOG_TABLE_NAME = "backup-service-log"; const std::string BACKUP_TABLE_NAME = "backup-service-backup"; #endif +// This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to +// recognize if we may receive multiple chunks or just one. If it was larger +// than the chunk limit, once we get the amount of data of size equal to the +// limit, we wouldn't know if we should put this in the database right away or +// wait for more data. +const size_t LOG_DATA_SIZE_DATABASE_LIMIT = 1 * 1024 * 1024; + } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h index 715542c8f..349c2724b 100644 --- a/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h +++ b/services/backup/docker-server/contents/server/src/Reactors/server/SendLogReactor.h @@ -1,67 +1,107 @@ #pragma once +#include "Constants.h" #include "ServerReadReactorBase.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include namespace comm { namespace network { namespace reactor { class SendLogReactor : public ServerReadReactorBase< backup::SendLogRequest, google::protobuf::Empty> { enum class State { USER_ID = 1, LOG_CHUNK = 2, }; + enum class PersistenceMethod { + UNKNOWN = 0, + DB = 1, + BLOB = 2, + }; + State state = State::USER_ID; + PersistenceMethod persistenceMethod = PersistenceMethod::UNKNOWN; std::string userID; + void storeInDatabase(const std::string &data) { + } + + void storeInBlob(const std::string &data) { + } + public: using ServerReadReactorBase:: ServerReadReactorBase; std::unique_ptr readRequest(backup::SendLogRequest request) override; void doneCallback() override; }; std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } - std::string chunk = request.logdata(); - std::cout << "log data received " << chunk << std::endl; + if (this->persistenceMethod == PersistenceMethod::DB) { + throw std::runtime_error( + "storing multiple chunks in the database is not allowed"); + } + std::string *chunk = request.mutable_logdata(); + // decide if keep in DB or upload to blob + if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { + if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { + this->persistenceMethod = PersistenceMethod::DB; + this->storeInDatabase(*chunk); + } else if (this->persistenceMethod == PersistenceMethod::BLOB) { + this->storeInBlob(*chunk); + } else { + throw std::runtime_error( + "error - invalid persistence state for chunk smaller than " + "database limit"); + } + } else { + if (this->persistenceMethod != PersistenceMethod::UNKNOWN && + this->persistenceMethod != PersistenceMethod::BLOB) { + throw std::runtime_error( + "error - invalid persistence state, uploading to blob should be " + "continued but it is not"); + } + this->persistenceMethod = PersistenceMethod::BLOB; + this->storeInBlob(*chunk); + } + std::cout << "log data received " << chunk->size() << std::endl; return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::doneCallback() { // TODO implement std::cout << "receive logs done " << this->status.error_code() << "/" << this->status.error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm