diff --git a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp index 21bd8b561..89e9c87ad 100644 --- a/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp +++ b/services/backup/src/Reactors/server/AddAttachmentsUtility.cpp @@ -1,99 +1,102 @@ #include "AddAttachmentsUtility.h" #include "blob_client/src/lib.rs.h" #include #include "BackupItem.h" #include "Constants.h" #include "DatabaseManager.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { grpc::Status AddAttachmentsUtility::processRequest( const backup::AddAttachmentsRequest *request) { grpc::Status status = grpc::Status::OK; std::string userID = request->userid(); std::string backupID = request->backupid(); std::string logID = request->logid(); const std::string holders = request->holders(); try { if (userID.empty()) { throw std::runtime_error("user id required but not provided"); } if (backupID.empty()) { throw std::runtime_error("backup id required but not provided"); } if (holders.empty()) { throw std::runtime_error("holders required but not provided"); } if (logID.empty()) { // add these attachments to backup std::shared_ptr backupItem = database::DatabaseManager::getInstance().findBackupItem( userID, backupID); backupItem->addAttachmentHolders(holders); database::DatabaseManager::getInstance().putBackupItem(*backupItem); } else { // add these attachments to log std::shared_ptr logItem = database::DatabaseManager::getInstance().findLogItem(backupID, logID); logItem->addAttachmentHolders(holders); if (!logItem->getPersistedInBlob() && database::LogItem::getItemSize(logItem.get()) > LOG_DATA_SIZE_DATABASE_LIMIT) { bool old = logItem->getPersistedInBlob(); logItem = this->moveToS3(logItem); } database::DatabaseManager::getInstance().putLogItem(*logItem); } } catch (std::exception &e) { LOG(ERROR) << e.what(); status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return status; } std::shared_ptr AddAttachmentsUtility::moveToS3(std::shared_ptr logItem) { std::string holder = tools::generateHolder( logItem->getDataHash(), logItem->getBackupID(), logItem->getLogID()); std::string data = std::move(logItem->getValue()); std::shared_ptr newLogItem = std::make_shared( logItem->getBackupID(), logItem->getLogID(), true, holder, logItem->getAttachmentHolders(), logItem->getDataHash()); // put into S3 std::condition_variable blobPutDoneCV; std::mutex blobPutDoneCVMutex; - put_client_initialize_cxx(); + put_client_initialize_cxx(holder.c_str()); put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), holder.c_str()); - put_client_blocking_read_cxx(); + put_client_blocking_read_cxx(holder.c_str()); put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), newLogItem->getDataHash().c_str()); - rust::String responseStr = put_client_blocking_read_cxx(); + rust::String responseStr = put_client_blocking_read_cxx(holder.c_str()); // data exists? if (!(bool)tools::charPtrToInt(responseStr.c_str())) { put_client_write_cxx( + holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(data).c_str()); - put_client_blocking_read_cxx(); + put_client_blocking_read_cxx(holder.c_str()); } - put_client_terminate_cxx(); + put_client_terminate_cxx(holder.c_str()); return newLogItem; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index 63c60a8de..84f686e25 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,139 +1,139 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include "blob_client/src/lib.rs.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { if (this->deviceID.empty()) { throw std::runtime_error( "trying to generate a backup ID with an empty device ID"); } return this->deviceID + std::to_string(tools::getCurrentTimestamp()); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::DEVICE_ID; return nullptr; } case State::DEVICE_ID: { if (!request.has_deviceid()) { throw std::runtime_error("device id expected but not received"); } this->deviceID = request.deviceid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; this->backupID = this->generateBackupID(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) != nullptr) { throw std::runtime_error( "Backup with id [" + this->backupID + "] for user [" + this->userID + "] already exists, creation aborted"); } response->set_backupid(this->backupID); this->holder = tools::generateHolder(this->dataHash, this->backupID); - put_client_initialize_cxx(); + put_client_initialize_cxx(this->holder.c_str()); put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->holder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + this->holder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->dataHash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique( grpc::Status::OK, true); } return nullptr; } case State::DATA_CHUNKS: { if (request.mutable_newcompactionchunk()->empty()) { return std::make_unique(grpc::Status::OK); } - try { - put_client_write_cxx( - tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), - std::string(std::move(*request.mutable_newcompactionchunk())) - .c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors - } catch (std::exception &e) { - throw std::runtime_error( - e.what()); // todo in base reactors we can just handle std exception - // instead of keep rethrowing here - } + put_client_write_cxx( + this->holder.c_str(), + tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), + std::string(std::move(*request.mutable_newcompactionchunk())) + .c_str()); + put_client_blocking_read_cxx( + this->holder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors + return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(); + put_client_terminate_cxx(this->holder.c_str()); // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, tools::getCurrentTimestamp(), tools::generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 099fc785c..e6a6c0745 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,192 +1,199 @@ #include "SendLogReactor.h" #include "blob_client/src/lib.rs.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { bool storedInBlob = this->persistenceMethod == PersistenceMethod::BLOB; database::LogItem logItem( this->backupID, this->logID, storedInBlob, storedInBlob ? this->blobHolder : this->value, {}, this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { throw std::runtime_error( "trying to put into the database an item with size " + std::to_string(database::LogItem::getItemSize(&logItem)) + " that exceeds the limit " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT)); } database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { return backupID + tools::ID_SEPARATOR + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutClient() { if (this->blobHolder.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty blob holder"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } - put_client_initialize_cxx(); + put_client_initialize_cxx(this->blobHolder.c_str()); } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) == nullptr) { throw std::runtime_error( "trying to send log for a non-existent backup"); } this->logID = this->generateLogID(this->backupID); this->response->set_logcheckpoint(this->logID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } if (request.mutable_logdata()->size() == 0) { return std::make_unique(grpc::Status::OK); } if (this->persistenceMethod == PersistenceMethod::DB) { throw std::runtime_error( "please do not send multiple tiny chunks (less than " + std::to_string(LOG_DATA_SIZE_DATABASE_LIMIT) + "), merge them into bigger parts instead"); } if (this->persistenceMethod == PersistenceMethod::BLOB) { put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), request.mutable_logdata()->c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors return nullptr; } this->value += std::move(*request.mutable_logdata()); database::LogItem logItem = database::LogItem( this->backupID, this->logID, true, this->value, "", this->hash); if (database::LogItem::getItemSize(&logItem) > LOG_DATA_SIZE_DATABASE_LIMIT) { this->persistenceMethod = PersistenceMethod::BLOB; this->blobHolder = tools::generateHolder(this->hash, this->backupID, this->logID); this->initializePutClient(); put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kHolder), this->blobHolder.c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kBlobHash), this->hash.c_str()); - rust::String responseStr = - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably - // want to delegate performing ops - // to separate threads in the base - // reactors + rust::String responseStr = put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably + // want to delegate performing ops + // to separate threads in the base + // reactors // data exists? if ((bool)tools::charPtrToInt(responseStr.c_str())) { return std::make_unique(grpc::Status::OK); } put_client_write_cxx( + this->blobHolder.c_str(), tools::getBlobPutField(blob::PutRequest::DataCase::kDataChunk), std::move(this->value).c_str()); - put_client_blocking_read_cxx(); // todo this should be avoided - // (blocking); we should be able to - // ignore responses; we probably want to - // delegate performing ops to separate - // threads in the base reactors + put_client_blocking_read_cxx( + this->blobHolder.c_str()); // todo this should be avoided + // (blocking); we should be able to + // ignore responses; we probably want to + // delegate performing ops to separate + // threads in the base reactors this->value = ""; } else { this->persistenceMethod = PersistenceMethod::DB; } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); - put_client_terminate_cxx(); + put_client_terminate_cxx(this->blobHolder.c_str()); if (!this->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->getStatusHolder()->getStatus().error_message()); } if (this->persistenceMethod != PersistenceMethod::BLOB && this->persistenceMethod != PersistenceMethod::DB) { throw std::runtime_error("Invalid persistence method detected"); } if (this->persistenceMethod == PersistenceMethod::DB) { this->storeInDatabase(); return; } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement } } // namespace reactor } // namespace network } // namespace comm