diff --git a/services/backup/src/Constants.h b/services/backup/src/Constants.h index 8711dcea3..85379103d 100644 --- a/services/backup/src/Constants.h +++ b/services/backup/src/Constants.h @@ -1,23 +1,24 @@ #pragma once #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { -const std::string LOG_TABLE_NAME = decorateTableName("backup-service-log"); +const std::string LOG_TABLE_NAME = + tools::decorateTableName("backup-service-log"); const std::string BACKUP_TABLE_NAME = - decorateTableName("backup-service-backup"); + tools::decorateTableName("backup-service-backup"); // This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to // recognize if we may receive multiple chunks or just one. If it was larger // than the chunk limit, once we get the amount of data of size equal to the // limit, we wouldn't know if we should put this in the database right away or // wait for more data. const size_t LOG_DATA_SIZE_DATABASE_LIMIT = 1 * 1024 * 1024; } // namespace network } // namespace comm diff --git a/services/backup/src/DatabaseManager.cpp b/services/backup/src/DatabaseManager.cpp index 72864349b..80188fd69 100644 --- a/services/backup/src/DatabaseManager.cpp +++ b/services/backup/src/DatabaseManager.cpp @@ -1,160 +1,160 @@ #include "DatabaseManager.h" #include "Constants.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include #include namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } void DatabaseManager::putBackupItem(const BackupItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(BackupItem::tableName); request.AddItem( BackupItem::FIELD_USER_ID, Aws::DynamoDB::Model::AttributeValue(item.getUserID())); request.AddItem( BackupItem::FIELD_CREATED, Aws::DynamoDB::Model::AttributeValue( - std::to_string(getCurrentTimestamp()))); + std::to_string(tools::getCurrentTimestamp()))); request.AddItem( BackupItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(item.getBackupID())); request.AddItem( BackupItem::FIELD_RECOVERY_DATA, Aws::DynamoDB::Model::AttributeValue(item.getRecoveryData())); request.AddItem( BackupItem::FIELD_COMPACTION_HOLDER, Aws::DynamoDB::Model::AttributeValue(item.getCompactionHolder())); if (!item.getAttachmentHolders().empty()) { request.AddItem( BackupItem::FIELD_ATTACHMENT_HOLDERS, Aws::DynamoDB::Model::AttributeValue(item.getAttachmentHolders())); } this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findBackupItem( const std::string &userID, const std::string &backupID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( BackupItem::FIELD_USER_ID, Aws::DynamoDB::Model::AttributeValue(userID)); request.AddKey( BackupItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(backupID)); return std::move(this->innerFindItem(request)); } std::shared_ptr DatabaseManager::findLastBackupItem(const std::string &userID) { std::shared_ptr item = createItemByType(); Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(BackupItem::tableName); req.SetKeyConditionExpression(BackupItem::FIELD_USER_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", userID); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName("userID-created-index"); req.SetLimit(1); req.SetScanIndexForward(false); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); if (items.empty()) { return nullptr; } return std::make_shared(items[0]); } void DatabaseManager::removeBackupItem(std::shared_ptr item) { if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putLogItem(const LogItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(LogItem::tableName); request.AddItem( LogItem::FIELD_BACKUP_ID, Aws::DynamoDB::Model::AttributeValue(item.getBackupID())); request.AddItem( LogItem::FIELD_LOG_ID, Aws::DynamoDB::Model::AttributeValue(item.getLogID())); request.AddItem( LogItem::FIELD_PERSISTED_IN_BLOB, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getPersistedInBlob()))); request.AddItem( LogItem::FIELD_VALUE, Aws::DynamoDB::Model::AttributeValue(item.getValue())); if (!item.getAttachmentHolders().empty()) { request.AddItem( LogItem::FIELD_ATTACHMENT_HOLDERS, Aws::DynamoDB::Model::AttributeValue(item.getAttachmentHolders())); } this->innerPutItem(std::make_shared(item), request); } std::vector> DatabaseManager::findLogItemsForBackup(const std::string &backupID) { std::vector> result; std::shared_ptr item = createItemByType(); Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(LogItem::tableName); req.SetKeyConditionExpression(LogItem::FIELD_BACKUP_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", backupID); req.SetExpressionAttributeValues(attributeValues); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeLogItem(std::shared_ptr item) { if (item == nullptr) { return; } this->innerRemoveItem(*item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp index 0e505631a..9e64256d7 100644 --- a/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp +++ b/services/backup/src/Reactors/server/CreateNewBackupReactor.cpp @@ -1,117 +1,117 @@ #include "CreateNewBackupReactor.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" namespace comm { namespace network { namespace reactor { std::string CreateNewBackupReactor::generateBackupID() { if (this->deviceID.empty()) { throw std::runtime_error( "trying to generate a backup ID with an empty device ID"); } - return this->deviceID + std::to_string(getCurrentTimestamp()); + return this->deviceID + std::to_string(tools::getCurrentTimestamp()); } std::unique_ptr CreateNewBackupReactor::handleRequest( backup::CreateNewBackupRequest request, backup::CreateNewBackupResponse *response) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::DEVICE_ID; return nullptr; } case State::DEVICE_ID: { if (!request.has_deviceid()) { throw std::runtime_error("device id expected but not received"); } this->deviceID = request.deviceid(); this->state = State::KEY_ENTROPY; return nullptr; } case State::KEY_ENTROPY: { if (!request.has_keyentropy()) { throw std::runtime_error( "backup key entropy expected but not received"); } this->keyEntropy = request.keyentropy(); this->state = State::DATA_HASH; return nullptr; } case State::DATA_HASH: { if (!request.has_newcompactionhash()) { throw std::runtime_error("data hash expected but not received"); } this->dataHash = request.newcompactionhash(); this->state = State::DATA_CHUNKS; this->backupID = this->generateBackupID(); if (database::DatabaseManager::getInstance().findBackupItem( this->userID, this->backupID) != nullptr) { throw std::runtime_error( "Backup with id [" + this->backupID + "] for user [" + this->userID + "] already exists, creation aborted"); } response->set_backupid(this->backupID); - this->holder = generateHolder(this->dataHash, this->backupID); + this->holder = tools::generateHolder(this->dataHash, this->backupID); this->putReactor = std::make_shared( this->holder, this->dataHash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); return nullptr; } case State::DATA_CHUNKS: { this->putReactor->scheduleSendingDataChunk(std::make_unique( std::move(*request.mutable_newcompactionchunk()))); return nullptr; } } throw std::runtime_error("new backup - invalid state"); } void CreateNewBackupReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lock2(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state == ReactorState::DONE && !this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lock2); } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } try { // TODO add recovery data // TODO handle attachments holders database::BackupItem backupItem( this->userID, this->backupID, - getCurrentTimestamp(), - generateRandomString(), + tools::getCurrentTimestamp(), + tools::generateRandomString(), this->holder, {}); database::DatabaseManager::getInstance().putBackupItem(backupItem); } catch (std::runtime_error &e) { std::cout << "db operations error: " << e.what() << std::endl; } } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/server/SendLogReactor.cpp b/services/backup/src/Reactors/server/SendLogReactor.cpp index 30fc0133b..2d4e57726 100644 --- a/services/backup/src/Reactors/server/SendLogReactor.cpp +++ b/services/backup/src/Reactors/server/SendLogReactor.cpp @@ -1,155 +1,157 @@ #include "SendLogReactor.h" #include "Constants.h" #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { namespace reactor { void SendLogReactor::storeInDatabase() { // TODO handle attachment holders database::LogItem logItem( this->backupID, this->logID, (this->persistenceMethod == PersistenceMethod::BLOB), this->value, {}); database::DatabaseManager::getInstance().putLogItem(logItem); } std::string SendLogReactor::generateLogID(const std::string &backupID) { - return backupID + ID_SEPARATOR + std::to_string(getCurrentTimestamp()); + return backupID + tools::ID_SEPARATOR + + std::to_string(tools::getCurrentTimestamp()); } void SendLogReactor::initializePutReactor() { if (this->value.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty value"); } if (this->hash.empty()) { throw std::runtime_error( "put reactor cannot be initialized with empty hash"); } if (this->putReactor == nullptr) { this->putReactor = std::make_shared( this->value, this->hash, &this->blobPutDoneCV); this->blobClient.put(this->putReactor); } } std::unique_ptr SendLogReactor::readRequest(backup::SendLogRequest request) { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); switch (this->state) { case State::USER_ID: { if (!request.has_userid()) { throw std::runtime_error("user id expected but not received"); } this->userID = request.userid(); this->state = State::BACKUP_ID; return nullptr; }; case State::BACKUP_ID: { if (!request.has_backupid()) { throw std::runtime_error("backup id expected but not received"); } this->backupID = request.backupid(); this->logID = this->generateLogID(this->backupID); this->state = State::LOG_HASH; return nullptr; }; case State::LOG_HASH: { if (!request.has_loghash()) { throw std::runtime_error("log hash expected but not received"); } this->hash = request.loghash(); this->state = State::LOG_CHUNK; return nullptr; }; case State::LOG_CHUNK: { if (!request.has_logdata()) { throw std::runtime_error("log data expected but not received"); } std::unique_ptr chunk = std::make_unique(std::move(*request.mutable_logdata())); if (chunk->size() == 0) { return std::make_unique(grpc::Status::OK); } // decide if keep in DB or upload to blob if (chunk->size() <= LOG_DATA_SIZE_DATABASE_LIMIT) { if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::DB; this->value = std::move(*chunk); this->storeInDatabase(); return std::make_unique(grpc::Status::OK); } else if (this->persistenceMethod == PersistenceMethod::BLOB) { this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } else { throw std::runtime_error( "error - invalid persistence state for chunk smaller than " "database limit"); } } else { if (this->persistenceMethod != PersistenceMethod::UNKNOWN && this->persistenceMethod != PersistenceMethod::BLOB) { throw std::runtime_error( "error - invalid persistence state, uploading to blob should be " "continued but it is not"); } if (this->persistenceMethod == PersistenceMethod::UNKNOWN) { this->persistenceMethod = PersistenceMethod::BLOB; } if (this->value.empty()) { - this->value = generateHolder(this->hash, this->backupID, this->logID); + this->value = + tools::generateHolder(this->hash, this->backupID, this->logID); } this->initializePutReactor(); this->putReactor->scheduleSendingDataChunk(std::move(chunk)); } return nullptr; }; } throw std::runtime_error("send log - invalid state"); } void SendLogReactor::terminateCallback() { const std::lock_guard lock(this->reactorStateMutex); if (this->persistenceMethod == PersistenceMethod::DB || this->putReactor == nullptr) { return; } this->putReactor->scheduleSendingDataChunk(std::make_unique("")); std::unique_lock lockPut(this->blobPutDoneCVMutex); if (this->putReactor->getStatusHolder()->state != ReactorState::DONE) { this->blobPutDoneCV.wait(lockPut); } else if (!this->putReactor->getStatusHolder()->getStatus().ok()) { throw std::runtime_error( this->putReactor->getStatusHolder()->getStatus().error_message()); } // store in db only when we successfully upload chunks this->storeInDatabase(); } void SendLogReactor::doneCallback() { // we make sure that the blob client's state is flushed to the main memory // as there may be multiple threads from the pool taking over here const std::lock_guard lock(this->reactorStateMutex); // TODO implement std::cout << "receive logs done " << this->getStatusHolder()->getStatus().error_code() << "/" << this->getStatusHolder()->getStatus().error_message() << std::endl; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/backup/src/Tools.cpp b/services/backup/src/Tools.cpp index afad647f5..44188c3f1 100644 --- a/services/backup/src/Tools.cpp +++ b/services/backup/src/Tools.cpp @@ -1,33 +1,35 @@ #include "Tools.h" #include "GlobalTools.h" #include #include #include namespace comm { namespace network { +namespace tools { std::string generateRandomString(std::size_t length) { const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; thread_local std::random_device generator; std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); std::string random_string; for (std::size_t i = 0; i < length; ++i) { random_string += CHARACTERS[distribution(generator)]; } return random_string; } std::string generateHolder( const std::string &blobHash, const std::string &backupID, const std::string &resourceID) { return backupID + ID_SEPARATOR + resourceID + ID_SEPARATOR + blobHash + - ID_SEPARATOR + generateUUID(); + ID_SEPARATOR + tools::generateUUID(); } +} // namespace tools } // namespace network } // namespace comm diff --git a/services/backup/src/Tools.h b/services/backup/src/Tools.h index 3c78aded6..47fc5584b 100644 --- a/services/backup/src/Tools.h +++ b/services/backup/src/Tools.h @@ -1,16 +1,18 @@ #pragma once #include namespace comm { namespace network { +namespace tools { std::string generateRandomString(std::size_t length = 20); std::string generateHolder( const std::string &blobHash, const std::string &backupID, const std::string &resourceID = ""); +} // namespace tools } // namespace network } // namespace comm diff --git a/services/backup/test/DatabaseManagerTest.cpp b/services/backup/test/DatabaseManagerTest.cpp index 8e9928203..08a27fcbf 100644 --- a/services/backup/test/DatabaseManagerTest.cpp +++ b/services/backup/test/DatabaseManagerTest.cpp @@ -1,104 +1,105 @@ #include #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include using namespace comm::network::database; class DatabaseManagerTest : public testing::Test { protected: virtual void SetUp() { Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; std::string generateName(const std::string prefix = "") { - return prefix + "-" + std::to_string(comm::network::getCurrentTimestamp()); + return prefix + "-" + + std::to_string(comm::network::tools::getCurrentTimestamp()); } BackupItem generateBackupItem(const std::string &userID, const std::string &backupID) { return BackupItem( userID, backupID, - comm::network::getCurrentTimestamp(), + comm::network::tools::getCurrentTimestamp(), "xxx", "xxx", {""}); } LogItem generateLogItem(const std::string &backupID, const std::string &logID) { return LogItem(backupID, logID, false, "xxx", {""}); } TEST_F(DatabaseManagerTest, TestOperationsOnBackupItems) { const std::string userID = generateName("user001"); std::vector backupIDs = {"backup001", "backup002", "backup003"}; for (const std::string &backupID : backupIDs) { DatabaseManager::getInstance().putBackupItem( generateBackupItem(userID, backupID)); } std::shared_ptr item; while (!backupIDs.empty()) { item = DatabaseManager::getInstance().findLastBackupItem(userID); EXPECT_NE(item, nullptr); EXPECT_EQ(item->getBackupID(), backupIDs.back()); backupIDs.pop_back(); DatabaseManager::getInstance().removeBackupItem(item); }; EXPECT_EQ(DatabaseManager::getInstance().findLastBackupItem(userID), nullptr); } TEST_F(DatabaseManagerTest, TestOperationsOnLogItems) { const std::string backupID1 = generateName("backup001"); const std::string backupID2 = generateName("backup002"); std::vector logIDs1 = {"log001", "log002", "log003"}; for (const std::string &logID : logIDs1) { DatabaseManager::getInstance().putLogItem( generateLogItem(backupID1, logID)); } std::vector logIDs2 = {"log021", "log022"}; for (const std::string &logID : logIDs2) { DatabaseManager::getInstance().putLogItem( generateLogItem(backupID2, logID)); } std::vector> items1 = DatabaseManager::getInstance().findLogItemsForBackup(backupID1); std::vector> items2 = DatabaseManager::getInstance().findLogItemsForBackup(backupID2); EXPECT_EQ(items1.size(), 3); EXPECT_EQ(items2.size(), 2); for (size_t i = 0; i < items1.size(); ++i) { EXPECT_EQ(logIDs1.at(i), items1.at(i)->getLogID()); DatabaseManager::getInstance().removeLogItem(items1.at(i)); } EXPECT_EQ( DatabaseManager::getInstance().findLogItemsForBackup(backupID1).size(), 0); for (size_t i = 0; i < items2.size(); ++i) { EXPECT_EQ(logIDs2.at(i), items2.at(i)->getLogID()); DatabaseManager::getInstance().removeLogItem(items2.at(i)); } EXPECT_EQ( DatabaseManager::getInstance().findLogItemsForBackup(backupID2).size(), 0); } diff --git a/services/blob/src/AwsS3Bucket.cpp b/services/blob/src/AwsS3Bucket.cpp index 141745974..268fa0c94 100644 --- a/services/blob/src/AwsS3Bucket.cpp +++ b/services/blob/src/AwsS3Bucket.cpp @@ -1,224 +1,224 @@ #include "AwsS3Bucket.h" #include "Constants.h" #include "GlobalConstants.h" #include "MultiPartUploader.h" #include "S3Tools.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include namespace comm { namespace network { AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) { } std::vector AwsS3Bucket::listObjects() const { Aws::S3::Model::ListObjectsRequest request; request.SetBucket(this->name); std::vector result; Aws::S3::Model::ListObjectsOutcome outcome = getS3Client()->ListObjects(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector objects = outcome.GetResult().GetContents(); for (Aws::S3::Model::Object &object : objects) { result.push_back(object.GetKey()); } return result; } bool AwsS3Bucket::isAvailable() const { Aws::S3::Model::HeadBucketRequest headRequest; headRequest.SetBucket(this->name); Aws::S3::Model::HeadBucketOutcome outcome = getS3Client()->HeadBucket(headRequest); return outcome.IsSuccess(); } size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const { Aws::S3::Model::HeadObjectRequest headRequest; headRequest.SetBucket(this->name); headRequest.SetKey(objectName); Aws::S3::Model::HeadObjectOutcome headOutcome = getS3Client()->HeadObject(headRequest); if (!headOutcome.IsSuccess()) { throw std::runtime_error(headOutcome.GetError().GetMessage()); } return headOutcome.GetResultWithOwnership().GetContentLength(); } void AwsS3Bucket::renameObject( const std::string ¤tName, const std::string &newName) { Aws::S3::Model::CopyObjectRequest copyRequest; copyRequest.SetCopySource(this->name + "/" + currentName); copyRequest.SetKey(newName); copyRequest.SetBucket(this->name); Aws::S3::Model::CopyObjectOutcome copyOutcome = getS3Client()->CopyObject(copyRequest); if (!copyOutcome.IsSuccess()) { throw std::runtime_error(copyOutcome.GetError().GetMessage()); } this->removeObject(currentName); } void AwsS3Bucket::writeObject( const std::string &objectName, const std::string &data) { // we don't have to handle multiple write here because the GRPC limit is 4MB // and minimum size of data to perform multipart upload is 5MB Aws::S3::Model::PutObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); std::shared_ptr body = std::shared_ptr( new boost::interprocess::bufferstream((char *)data.data(), data.size())); request.SetBody(body); Aws::S3::Model::PutObjectOutcome outcome = getS3Client()->PutObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } std::string AwsS3Bucket::getObjectData(const std::string &objectName) const { Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); const size_t size = this->getObjectSize(objectName); if (size > GRPC_CHUNK_SIZE_LIMIT) { - throw invalid_argument_error(std::string( + throw tools::invalid_argument_error(std::string( "The file is too big(" + std::to_string(size) + " bytes, max is " + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + "bytes), please, use getObjectDataChunks")); } Aws::S3::Model::GetObjectOutcome outcome = getS3Client()->GetObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = outcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); std::string cpy = result; return result; } void AwsS3Bucket::getObjectDataChunks( const std::string &objectName, const std::function &callback, const size_t chunkSize) const { const size_t fileSize = this->getObjectSize(objectName); if (fileSize == 0) { return; } Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); for (size_t offset = 0; offset < fileSize; offset += chunkSize) { const size_t nextSize = std::min(chunkSize, fileSize - offset); std::string range = "bytes=" + std::to_string(offset) + "-" + std::to_string(offset + nextSize); request.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(request); if (!getOutcome.IsSuccess()) { throw std::runtime_error(getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); result.resize(nextSize); callback(result); } } void AwsS3Bucket::appendToObject( const std::string &objectName, const std::string &data) { const size_t objectSize = this->getObjectSize(objectName); if (objectSize < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { std::string currentData = this->getObjectData(objectName); currentData += data; this->writeObject(objectName, currentData); return; } size_t currentSize = 0; MultiPartUploader uploader( getS3Client(), this->name, objectName + "-multipart"); std::function callback = [&uploader, &data, ¤tSize, objectSize](const std::string &chunk) { currentSize += chunk.size(); if (currentSize < objectSize) { uploader.addPart(chunk); } else if (currentSize == objectSize) { uploader.addPart(std::string(chunk + data)); } else { throw std::runtime_error( "size of chunks exceeds the size of the object"); } }; this->getObjectDataChunks( objectName, callback, AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE); uploader.finishUpload(); // this will overwrite the target file this->renameObject(objectName + "-multipart", objectName); const size_t newSize = this->getObjectSize(objectName); if (objectSize + data.size() != newSize) { throw std::runtime_error( "append to object " + objectName + " has been performed but the final sizes don't " "match, the size is now [" + std::to_string(newSize) + "] but should be [" + std::to_string(objectSize + data.size()) + "]"); } } void AwsS3Bucket::clearObject(const std::string &objectName) { this->writeObject(objectName, ""); } void AwsS3Bucket::removeObject(const std::string &objectName) { Aws::S3::Model::DeleteObjectRequest deleteRequest; deleteRequest.SetBucket(this->name); deleteRequest.SetKey(objectName); Aws::S3::Model::DeleteObjectOutcome deleteOutcome = getS3Client()->DeleteObject(deleteRequest); if (!deleteOutcome.IsSuccess()) { throw std::runtime_error(deleteOutcome.GetError().GetMessage()); } } } // namespace network } // namespace comm diff --git a/services/blob/src/BlobServiceImpl.cpp b/services/blob/src/BlobServiceImpl.cpp index ab641e98f..763ebb69e 100644 --- a/services/blob/src/BlobServiceImpl.cpp +++ b/services/blob/src/BlobServiceImpl.cpp @@ -1,102 +1,102 @@ #include "BlobServiceImpl.h" #include "Constants.h" #include "DatabaseManager.h" #include "MultiPartUploader.h" #include "S3Tools.h" #include "Tools.h" #include "GetReactor.h" #include "PutReactor.h" #include #include namespace comm { namespace network { BlobServiceImpl::BlobServiceImpl() { Aws::InitAPI({}); if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) { throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available"); } } BlobServiceImpl::~BlobServiceImpl() { Aws::ShutdownAPI({}); } void BlobServiceImpl::verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path) { - const std::string computedBlobHash = computeHashForFile(s3Path); + const std::string computedBlobHash = tools::computeHashForFile(s3Path); if (expectedBlobHash != computedBlobHash) { throw std::runtime_error( "blob hash mismatch, expected: [" + expectedBlobHash + "], computed: [" + computedBlobHash + "]"); } } void BlobServiceImpl::assignVariableIfEmpty( const std::string &label, std::string &lvalue, const std::string &rvalue) { if (!lvalue.empty()) { throw std::runtime_error( "multiple assignment for variable " + label + " is not allowed"); } lvalue = rvalue; } grpc::ServerBidiReactor * BlobServiceImpl::Put(grpc::CallbackServerContext *context) { return new reactor::PutReactor(); } grpc::ServerWriteReactor *BlobServiceImpl::Get( grpc::CallbackServerContext *context, const blob::GetRequest *request) { reactor::GetReactor *gr = new reactor::GetReactor(request); gr->start(); return gr; } grpc::ServerUnaryReactor *BlobServiceImpl::Remove( grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) { grpc::Status status = grpc::Status::OK; const std::string holder = request->holder(); try { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error("no item found for holder: " + holder); } // TODO handle cleanup here properly // for now the object's being removed right away const std::string blobHash = reverseIndexItem->getBlobHash(); database::DatabaseManager::getInstance().removeReverseIndexItem(holder); if (database::DatabaseManager::getInstance() .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash()) .size() == 0) { - database::S3Path s3Path = findS3Path(*reverseIndexItem); + database::S3Path s3Path = tools::findS3Path(*reverseIndexItem); AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); bucket.removeObject(s3Path.getObjectName()); database::DatabaseManager::getInstance().removeBlobItem(blobHash); } } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } auto *reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; } } // namespace network } // namespace comm diff --git a/services/blob/src/Constants.h b/services/blob/src/Constants.h index 1325fee1d..29da7424d 100644 --- a/services/blob/src/Constants.h +++ b/services/blob/src/Constants.h @@ -1,21 +1,22 @@ #pragma once #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { // 5MB limit const size_t AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE = 5 * 1024 * 1024; const std::string BLOB_BUCKET_NAME = "commapp-blob"; -const std::string BLOB_TABLE_NAME = decorateTableName("blob-service-blob"); +const std::string BLOB_TABLE_NAME = + tools::decorateTableName("blob-service-blob"); const std::string REVERSE_INDEX_TABLE_NAME = - decorateTableName("blob-service-reverse-index"); + tools::decorateTableName("blob-service-reverse-index"); } // namespace network } // namespace comm diff --git a/services/blob/src/DatabaseManager.cpp b/services/blob/src/DatabaseManager.cpp index 5006db3e5..273cb9724 100644 --- a/services/blob/src/DatabaseManager.cpp +++ b/services/blob/src/DatabaseManager.cpp @@ -1,121 +1,121 @@ #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include #include namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } void DatabaseManager::putBlobItem(const BlobItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(BlobItem::tableName); request.AddItem( BlobItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(item.getBlobHash())); request.AddItem( BlobItem::FIELD_S3_PATH, Aws::DynamoDB::Model::AttributeValue(item.getS3Path().getFullPath())); request.AddItem( BlobItem::FIELD_CREATED, Aws::DynamoDB::Model::AttributeValue( - std::to_string(getCurrentTimestamp()))); + std::to_string(tools::getCurrentTimestamp()))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findBlobItem(const std::string &blobHash) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( BlobItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(blobHash)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeBlobItem(const std::string &blobHash) { std::shared_ptr item = this->findBlobItem(blobHash); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) { if (this->findReverseIndexItemByHolder(item.getHolder()) != nullptr) { throw std::runtime_error( "An item for the given holder [" + item.getHolder() + "] already exists"); } Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(ReverseIndexItem::tableName); request.AddItem( ReverseIndexItem::FIELD_HOLDER, Aws::DynamoDB::Model::AttributeValue(item.getHolder())); request.AddItem( ReverseIndexItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(item.getBlobHash())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( ReverseIndexItem::FIELD_HOLDER, Aws::DynamoDB::Model::AttributeValue(holder)); return std::move(this->innerFindItem(request)); } std::vector> DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(ReverseIndexItem::tableName); req.SetKeyConditionExpression("blobHash = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", blobHash); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName("blobHash-index"); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeReverseIndexItem(const std::string &holder) { std::shared_ptr item = findReverseIndexItemByHolder(holder); if (item == nullptr) { return; } this->innerRemoveItem(*item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h index 1a4689774..906703b2b 100644 --- a/services/blob/src/Reactors/server/GetReactor.h +++ b/services/blob/src/Reactors/server/GetReactor.h @@ -1,89 +1,89 @@ #pragma once #include "GlobalConstants.h" #include "S3Tools.h" #include "ServerWriteReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class GetReactor : public ServerWriteReactorBase { size_t offset = 0; size_t fileSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; database::S3Path s3Path; Aws::S3::Model::GetObjectRequest getRequest; public: using ServerWriteReactorBase:: ServerWriteReactorBase; std::unique_ptr writeResponse(blob::GetResponse *response) override { if (this->offset >= this->fileSize) { return std::make_unique(grpc::Status::OK); } const size_t nextSize = std::min(this->chunkSize, this->fileSize - this->offset); std::string range = "bytes=" + std::to_string(this->offset) + "-" + std::to_string(this->offset + nextSize - 1); this->getRequest.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(this->getRequest); if (!getOutcome.IsSuccess()) { return std::make_unique( grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); response->set_datachunk(result); this->offset += nextSize; return nullptr; } void initialize() override { - this->s3Path = findS3Path(this->request.holder()); + this->s3Path = tools::findS3Path(this->request.holder()); this->fileSize = getBucket(s3Path.getBucketName()).getObjectSize(s3Path.getObjectName()); this->getRequest.SetBucket(this->s3Path.getBucketName()); this->getRequest.SetKey(this->s3Path.getObjectName()); AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName()); if (!bucket.isAvailable()) { throw std::runtime_error( "bucket [" + this->s3Path.getBucketName() + "] not available"); } const size_t fileSize = bucket.getObjectSize(this->s3Path.getObjectName()); if (this->fileSize == 0) { throw std::runtime_error("object empty"); } }; void doneCallback() override{}; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/PutReactor.h b/services/blob/src/Reactors/server/PutReactor.h index 615b8f181..5769d370f 100644 --- a/services/blob/src/Reactors/server/PutReactor.h +++ b/services/blob/src/Reactors/server/PutReactor.h @@ -1,103 +1,103 @@ #pragma once #include "ServerBidiReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include namespace comm { namespace network { namespace reactor { class PutReactor : public ServerBidiReactorBase { std::string holder; std::string blobHash; std::string currentChunk; std::unique_ptr s3Path; std::shared_ptr blobItem; std::unique_ptr uploader; bool dataExists = false; public: std::unique_ptr handleRequest( blob::PutRequest request, blob::PutResponse *response) override { if (this->holder.empty()) { if (request.holder().empty()) { throw std::runtime_error("holder has not been provided"); } this->holder = request.holder(); return nullptr; } if (this->blobHash.empty()) { if (request.blobhash().empty()) { throw std::runtime_error("blob hash has not been provided"); } this->blobHash = request.blobhash(); this->blobItem = database::DatabaseManager::getInstance().findBlobItem(this->blobHash); if (this->blobItem != nullptr) { this->s3Path = std::make_unique(this->blobItem->getS3Path()); response->set_dataexists(true); this->dataExists = true; return std::make_unique( grpc::Status::OK, true); } this->s3Path = std::make_unique( - generateS3Path(BLOB_BUCKET_NAME, this->blobHash)); + tools::generateS3Path(BLOB_BUCKET_NAME, this->blobHash)); this->blobItem = std::make_shared(this->blobHash, *s3Path); response->set_dataexists(false); return nullptr; } if (request.datachunk().empty()) { return std::make_unique(grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected")); } if (this->uploader == nullptr) { this->uploader = std::make_unique( getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName()); } this->currentChunk += request.datachunk(); if (this->currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { this->uploader->addPart(this->currentChunk); this->currentChunk.clear(); } return nullptr; } void doneCallback() override { if (!this->status.status.ok()) { return; } const database::ReverseIndexItem reverseIndexItem( this->holder, this->blobHash); if (this->uploader == nullptr) { if (!this->dataExists) { throw std::runtime_error("uploader not initialized as expected"); } database::DatabaseManager::getInstance().putReverseIndexItem( reverseIndexItem); return; } if (!this->readingAborted) { return; } if (!currentChunk.empty()) { this->uploader->addPart(this->currentChunk); } this->uploader->finishUpload(); database::DatabaseManager::getInstance().putBlobItem(*this->blobItem); database::DatabaseManager::getInstance().putReverseIndexItem( reverseIndexItem); } }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/S3Tools.cpp b/services/blob/src/S3Tools.cpp index 919d8add4..cd6ac8869 100644 --- a/services/blob/src/S3Tools.cpp +++ b/services/blob/src/S3Tools.cpp @@ -1,47 +1,47 @@ #include "S3Tools.h" #include "Constants.h" #include "GlobalConstants.h" #include "GlobalTools.h" #include "Tools.h" #include #include namespace comm { namespace network { AwsS3Bucket getBucket(const std::string &bucketName) { return AwsS3Bucket(bucketName); } std::vector listBuckets() { Aws::S3::Model::ListBucketsOutcome outcome = getS3Client()->ListBuckets(); std::vector result; if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector buckets = outcome.GetResult().GetBuckets(); for (Aws::S3::Model::Bucket &bucket : buckets) { result.push_back(bucket.GetName()); } return result; } std::unique_ptr getS3Client() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; - if (isDevMode()) { + if (tools::isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; return std::make_unique( config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/blob/src/Tools.cpp b/services/blob/src/Tools.cpp index 22ebbabc3..95e1f7933 100644 --- a/services/blob/src/Tools.cpp +++ b/services/blob/src/Tools.cpp @@ -1,73 +1,75 @@ #include "Tools.h" #include "Constants.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" #include "GlobalConstants.h" #include "S3Tools.h" #include #include #include #include #include namespace comm { namespace network { +namespace tools { database::S3Path generateS3Path(const std::string &bucketName, const std::string &blobHash) { return database::S3Path(bucketName, blobHash); } std::string computeHashForFile(const database::S3Path &s3Path) { SHA512_CTX ctx; SHA512_Init(&ctx); const std::function callback = [&ctx](const std::string &chunk) { SHA512_Update(&ctx, chunk.data(), chunk.size()); }; getBucket(s3Path.getBucketName()) .getObjectDataChunks( s3Path.getObjectName(), callback, GRPC_CHUNK_SIZE_LIMIT); unsigned char hash[SHA512_DIGEST_LENGTH]; SHA512_Final(hash, &ctx); std::ostringstream hashStream; for (int i = 0; i < SHA512_DIGEST_LENGTH; i++) { hashStream << std::hex << std::setfill('0') << std::setw(2) << std::nouppercase << (int)hash[i]; } return hashStream.str(); } database::S3Path findS3Path(const std::string &holder) { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error( "provided holder: [" + holder + "] has not been found in the database"); } return findS3Path(*reverseIndexItem); } database::S3Path findS3Path(const database::ReverseIndexItem &reverseIndexItem) { std::shared_ptr blobItem = database::DatabaseManager::getInstance().findBlobItem( reverseIndexItem.getBlobHash()); if (blobItem == nullptr) { throw std::runtime_error( "no blob found for blobHash: [" + reverseIndexItem.getBlobHash() + "]"); } database::S3Path result = blobItem->getS3Path(); return result; } +} // namespace tools } // namespace network } // namespace comm diff --git a/services/blob/src/Tools.h b/services/blob/src/Tools.h index 4b5bf58d2..33aa8cadc 100644 --- a/services/blob/src/Tools.h +++ b/services/blob/src/Tools.h @@ -1,26 +1,28 @@ #pragma once #include "DatabaseEntitiesTools.h" #include "S3Path.h" namespace comm { namespace network { +namespace tools { database::S3Path generateS3Path(const std::string &bucketName, const std::string &blobHash); std::string computeHashForFile(const database::S3Path &s3Path); database::S3Path findS3Path(const std::string &holder); database::S3Path findS3Path(const database::ReverseIndexItem &reverseIndexItem); class invalid_argument_error : public std::runtime_error { public: invalid_argument_error(std::string errorMessage) : std::runtime_error(errorMessage) { } }; +} // namespace tools } // namespace network } // namespace comm diff --git a/services/blob/test/MultiPartUploadTest.cpp b/services/blob/test/MultiPartUploadTest.cpp index 8c271fedc..5b1ecdf06 100644 --- a/services/blob/test/MultiPartUploadTest.cpp +++ b/services/blob/test/MultiPartUploadTest.cpp @@ -1,76 +1,77 @@ #include #include "AwsS3Bucket.h" #include "MultiPartUploader.h" #include "S3Tools.h" #include "TestTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class MultiPartUploadTest : public testing::Test { protected: std::shared_ptr s3Client; const std::string bucketName = "commapp-test"; std::unique_ptr bucket; virtual void SetUp() { Aws::InitAPI({}); s3Client = std::move(getS3Client()); bucket = std::make_unique(bucketName); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; std::string generateNByes(const size_t n) { std::string result; result.resize(n); memset((char *)result.data(), 'A', n); return result; } TEST_F(MultiPartUploadTest, ThrowingTooSmallPart) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, ThrowingTooSmallPartOneByte) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - 1)); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, SuccessfulWriteMultipleChunks) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE)); mpu.addPart("xxx"); mpu.finishUpload(); - EXPECT_THROW(bucket->getObjectData(objectName), invalid_argument_error); + EXPECT_THROW( + bucket->getObjectData(objectName), tools::invalid_argument_error); EXPECT_EQ( bucket->getObjectSize(objectName), AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3); bucket->removeObject(objectName); } TEST_F(MultiPartUploadTest, SuccessfulWriteOneChunk) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_EQ(bucket->getObjectSize(objectName), 3); bucket->removeObject(objectName); } diff --git a/services/lib/src/DynamoDBTools.cpp b/services/lib/src/DynamoDBTools.cpp index bcfdad144..91bc48cd1 100644 --- a/services/lib/src/DynamoDBTools.cpp +++ b/services/lib/src/DynamoDBTools.cpp @@ -1,20 +1,20 @@ #include "DynamoDBTools.h" #include "Constants.h" #include "GlobalConstants.h" #include "GlobalTools.h" namespace comm { namespace network { std::unique_ptr getDynamoDBClient() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; - if (isDevMode()) { + if (tools::isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/lib/src/GlobalTools.cpp b/services/lib/src/GlobalTools.cpp index 01ec08223..16d3426a4 100644 --- a/services/lib/src/GlobalTools.cpp +++ b/services/lib/src/GlobalTools.cpp @@ -1,46 +1,48 @@ #include "GlobalTools.h" #include #include #include #include #include #include #include namespace comm { namespace network { +namespace tools { uint64_t getCurrentTimestamp() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()) .count(); } bool hasEnvFlag(const std::string &flag) { if (std::getenv(flag.c_str()) == nullptr) { return false; } return std::string(std::getenv(flag.c_str())) == "1"; } std::string decorateTableName(const std::string &baseName) { std::string suffix = ""; if (hasEnvFlag("COMM_TEST_SERVICES")) { suffix = "-test"; } return baseName + suffix; } bool isDevMode() { return hasEnvFlag("COMM_SERVICES_DEV_MODE"); } std::string generateUUID() { thread_local boost::uuids::random_generator random_generator; return boost::uuids::to_string(random_generator()); } +} // namespace tools } // namespace network } // namespace comm diff --git a/services/lib/src/GlobalTools.h b/services/lib/src/GlobalTools.h index 7b2ff3bec..1e8a417e0 100644 --- a/services/lib/src/GlobalTools.h +++ b/services/lib/src/GlobalTools.h @@ -1,22 +1,24 @@ #pragma once #include #include namespace comm { namespace network { +namespace tools { const std::string ID_SEPARATOR = ":"; uint64_t getCurrentTimestamp(); bool hasEnvFlag(const std::string &flag); std::string decorateTableName(const std::string &baseName); bool isDevMode(); std::string generateUUID(); +} // namespace tools } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp index e4013ab62..b14f805f3 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -1,146 +1,147 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" +#include "GlobalTools.h" #include "Tools.h" #include #include namespace comm { namespace network { AmqpManager &AmqpManager::getInstance() { static AmqpManager instance; return instance; } void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); LOG(INFO) << "AMQP: Connecting to " << amqpUri; auto *loop = uv_default_loop(); AMQP::LibUvHandler handler(loop); AMQP::TcpConnection connection(&handler, AMQP::Address(amqpUri)); this->amqpChannel = std::make_unique(&connection); this->amqpChannel->onError([this](const char *message) { LOG(ERROR) << "AMQP: channel error: " << message << ", will try to reconnect"; this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = std::to_string(AMQP_MESSAGE_TTL); arguments["x-expires"] = std::to_string(AMQP_QUEUE_TTL); this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { LOG(INFO) << "AMQP: Queue " << name << " created"; this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") .onError([this, tunnelbrokerID, fanoutExchangeName]( const char *message) { LOG(ERROR) << "AMQP: Failed to bind queue: " << tunnelbrokerID << " to exchange: " << fanoutExchangeName; this->amqpReady = false; }); this->amqpReady = true; this->amqpChannel->consume(tunnelbrokerID) .onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body()); const std::string messageID(headers[AMQP_HEADER_MESSAGEID]); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); LOG(INFO) << "AMQP: Message consumed for deviceID: " << toDeviceID; DeliveryBroker::getInstance().push( messageID, deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { LOG(ERROR) << "AMQP: Message parsing exception: " << e.what(); } }) .onError([](const char *message) { LOG(ERROR) << "AMQP: Error on message consume: " << message; }); }) .onError([](const char *message) { throw std::runtime_error( "AMQP: Queue creation error: " + std::string(message)); }); uv_run(loop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { while (true) { int64_t currentTimestamp = tools::getCurrentTimestamp(); if (this->lastConnectionTimestamp && currentTimestamp - this->lastConnectionTimestamp < AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { throw std::runtime_error( "AMQP reconnection attempt interval too short, tried to reconnect " "after " + std::to_string(currentTimestamp - this->lastConnectionTimestamp) + "ms, the shortest allowed interval is " + std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); } this->lastConnectionTimestamp = currentTimestamp; this->connectInternal(); } } bool AmqpManager::send( std::string messageID, std::string fromDeviceID, std::string toDeviceID, std::string payload) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message send error: channel not ready"; return false; } try { AMQP::Envelope env(payload.c_str(), payload.size()); AMQP::Table headers; headers[AMQP_HEADER_MESSAGEID] = messageID; headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { LOG(ERROR) << "AMQP: Error while publishing message: " << e.what(); return false; } return true; }; void AmqpManager::ack(uint64_t deliveryTag) { if (!this->amqpReady) { LOG(ERROR) << "AMQP: Message ACK error: channel not ready"; return; } this->amqpChannel->ack(deliveryTag); } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp index a7c31cb67..9382b5b19 100644 --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -1,213 +1,214 @@ #include "DatabaseManager.h" #include "DynamoDBTools.h" +#include "GlobalTools.h" namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } bool DatabaseManager::isTableAvailable(const std::string &tableName) { Aws::DynamoDB::Model::DescribeTableRequest request; request.SetTableName(tableName); // Check table availability by invoking DescribeTable const Aws::DynamoDB::Model::DescribeTableOutcome &result = getDynamoDBClient()->DescribeTable(request); return result.IsSuccess(); } void DatabaseManager::putSessionItem(const DeviceSessionItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(item.getSessionID())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( DeviceSessionItem::FIELD_PUBKEY, Aws::DynamoDB::Model::AttributeValue(item.getPubKey())); request.AddItem( DeviceSessionItem::FIELD_NOTIFY_TOKEN, Aws::DynamoDB::Model::AttributeValue(item.getNotifyToken())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_TYPE, Aws::DynamoDB::Model::AttributeValue(item.getDeviceType())); request.AddItem( DeviceSessionItem::FIELD_APP_VERSION, Aws::DynamoDB::Model::AttributeValue(item.getAppVersion())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_OS, Aws::DynamoDB::Model::AttributeValue(item.getDeviceOs())); request.AddItem( DeviceSessionItem::FIELD_CHECKPOINT_TIME, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getCheckpointTime()))); request.AddItem( DeviceSessionItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionItem(const std::string &sessionID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(sessionID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionItem(const std::string &sessionID) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putSessionSignItem(const SessionSignItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( SessionSignItem::FIELD_SESSION_VERIFICATION, Aws::DynamoDB::Model::AttributeValue(item.getSign())); request.AddItem( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( SessionSignItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_SIGN_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionSignItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionSignItem(const std::string &deviceID) { std::shared_ptr item = this->findSessionSignItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putPublicKeyItem(const PublicKeyItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( PublicKeyItem::FIELD_PUBLIC_KEY, Aws::DynamoDB::Model::AttributeValue(item.getPublicKey())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findPublicKeyItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removePublicKeyItem(const std::string &deviceID) { std::shared_ptr item = this->findPublicKeyItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putMessageItem(const MessageItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); request.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); request.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); request.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); request.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); request.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); request.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findMessageItem(const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); return std::move(this->innerFindItem(request)); } std::vector> DatabaseManager::findMessageItemsByReceiver(const std::string &toDeviceID) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(MessageItem().getTableName()); req.SetKeyConditionExpression( MessageItem::FIELD_TO_DEVICE_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", toDeviceID); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName(MessageItem::INDEX_TO_DEVICE_ID); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeMessageItem(const std::string &messageID) { std::shared_ptr item = this->findMessageItem(messageID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp index f3e6a04cc..4bdc193bb 100644 --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,224 +1,224 @@ #include "TunnelbrokerServiceImpl.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = tools::generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); - const std::string newSessionID = generateUUID(); + const std::string newSessionID = tools::generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { LOG(INFO) << "gRPC: " << "Session sign request not found for deviceID: " << deviceID; return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { LOG(INFO) << "gRPC: " << "The public key doesn't match for deviceID"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { LOG(INFO) << "gRPC: " << "Signature for the verification message is not valid"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'NewSession' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { LOG(INFO) << "gRPC: " << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); - const std::string messageID = generateUUID(); + const std::string messageID = tools::generateUUID(); if (!AmqpManager::getInstance().send( messageID, clientDeviceID, request->todeviceid(), std::string(request->payload()))) { LOG(ERROR) << "gRPC: " << "Error while publish the message to AMQP"; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Send' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { LOG(INFO) << "gRPC: " << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); tunnelbroker::GetResponse response; response.set_fromdeviceid(messageToDeliver.fromDeviceID); response.set_payload(messageToDeliver.payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); if (DeliveryBroker::getInstance().isEmpty(clientDeviceID)) { DeliveryBroker::getInstance().erase(clientDeviceID); } } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Get' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Tools/Tools.cpp b/services/tunnelbroker/src/Tools/Tools.cpp index e873fc84a..d02578dd6 100644 --- a/services/tunnelbroker/src/Tools/Tools.cpp +++ b/services/tunnelbroker/src/Tools/Tools.cpp @@ -1,78 +1,72 @@ #include "Tools.h" #include "ConfigManager.h" #include "Constants.h" #include #include #include #include #include #include namespace comm { namespace network { namespace tools { std::string generateRandomString(std::size_t length) { const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; thread_local std::random_device generator; std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); std::string random_string; for (std::size_t i = 0; i < length; ++i) { random_string += CHARACTERS[distribution(generator)]; } return random_string; } -int64_t getCurrentTimestamp() { - using namespace std::chrono; - return duration_cast(system_clock::now().time_since_epoch()) - .count(); -} - bool validateDeviceID(std::string deviceID) { try { static const std::regex deviceIDKeyserverRegexp("^ks:.*"); if (std::regex_match(deviceID, deviceIDKeyserverRegexp)) { return ( deviceID == config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DEFAULT_KEYSERVER_ID)); } return std::regex_match(deviceID, DEVICEID_FORMAT_REGEX); } catch (const std::exception &e) { LOG(ERROR) << "Tools: " << "Got an exception at `validateDeviceID`: " << e.what(); return false; } } bool validateSessionID(std::string sessionID) { try { return std::regex_match(sessionID, SESSION_ID_FORMAT_REGEX); } catch (const std::exception &e) { LOG(ERROR) << "Tools: " << "Got an exception at `validateSessionId`: " << e.what(); return false; } } void checkIfNotEmpty(std::string fieldName, std::string stringToCheck) { if (stringToCheck.empty()) { throw std::runtime_error( "Error: Required text field " + fieldName + " is empty."); } } void checkIfNotZero(std::string fieldName, uint64_t numberToCheck) { if (numberToCheck == 0) { throw std::runtime_error( "Error: Required number " + fieldName + " is zero."); } } } // namespace tools } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Tools/Tools.h b/services/tunnelbroker/src/Tools/Tools.h index b5b283891..9cbaa41d6 100644 --- a/services/tunnelbroker/src/Tools/Tools.h +++ b/services/tunnelbroker/src/Tools/Tools.h @@ -1,19 +1,18 @@ #pragma once #include #include namespace comm { namespace network { namespace tools { std::string generateRandomString(std::size_t length); -int64_t getCurrentTimestamp(); bool validateDeviceID(std::string deviceID); bool validateSessionID(std::string sessionID); void checkIfNotEmpty(std::string fieldName, std::string stringToCheck); void checkIfNotZero(std::string fieldName, uint64_t numberToCheck); } // namespace tools } // namespace network } // namespace comm diff --git a/services/tunnelbroker/test/AmqpManagerTest.cpp b/services/tunnelbroker/test/AmqpManagerTest.cpp index 730d2217a..7aa2ebb78 100644 --- a/services/tunnelbroker/test/AmqpManagerTest.cpp +++ b/services/tunnelbroker/test/AmqpManagerTest.cpp @@ -1,76 +1,76 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class AmqpManagerTest : public testing::Test { protected: virtual void SetUp() { config::ConfigManager::getInstance().load(); std::thread amqpThread([]() { AmqpManager::getInstance().connect(); }); } }; TEST_F(AmqpManagerTest, SentAndPopedMessagesAreSameOnStaticData) { const std::string messageID = "bc0c1aa2-bf09-11ec-9d64-0242ac120002"; const std::string toDeviceID = "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const std::string fromDeviceID = "web:JouLWf84zqRIsjBdHLOcHS9M4eSCz7VF84wT1uOD83u1qxDAqmqI4swmxNINjuhd"; const std::string payload = "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe"; EXPECT_EQ( AmqpManager::getInstance().send( messageID, toDeviceID, fromDeviceID, payload), true); DeliveryBrokerMessage receivedMessage = DeliveryBroker::getInstance().pop(toDeviceID); EXPECT_EQ(messageID, receivedMessage.messageID); EXPECT_EQ(fromDeviceID, receivedMessage.fromDeviceID); EXPECT_EQ(payload, receivedMessage.payload); AmqpManager::getInstance().ack(receivedMessage.deliveryTag); } TEST_F(AmqpManagerTest, SentAndPopedMessagesAreSameOnGeneratedData) { - const std::string messageID = generateUUID(); + const std::string messageID = tools::generateUUID(); const std::string toDeviceID = "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); const std::string fromDeviceID = "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); const std::string payload = tools::generateRandomString(512); EXPECT_EQ( AmqpManager::getInstance().send( messageID, toDeviceID, fromDeviceID, payload), true); DeliveryBrokerMessage receivedMessage = DeliveryBroker::getInstance().pop(toDeviceID); EXPECT_EQ(messageID, receivedMessage.messageID) << "Generated messageID \"" << messageID << "\" differs from what was got from amqp message " << receivedMessage.messageID; EXPECT_EQ(fromDeviceID, receivedMessage.fromDeviceID) << "Generated FromDeviceID \"" << fromDeviceID << "\" differs from what was got from amqp message " << receivedMessage.fromDeviceID; EXPECT_EQ(payload, receivedMessage.payload) << "Generated Payload \"" << payload << "\" differs from what was got from amqp message " << receivedMessage.payload; AmqpManager::getInstance().ack(receivedMessage.deliveryTag); } diff --git a/services/tunnelbroker/test/DatabaseManagerTest.cpp b/services/tunnelbroker/test/DatabaseManagerTest.cpp index a136d489b..4cafca55e 100644 --- a/services/tunnelbroker/test/DatabaseManagerTest.cpp +++ b/services/tunnelbroker/test/DatabaseManagerTest.cpp @@ -1,300 +1,300 @@ #include "DatabaseManager.h" #include "ConfigManager.h" #include "Constants.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include using namespace comm::network; class DatabaseManagerTest : public testing::Test { protected: virtual void SetUp() { config::ConfigManager::getInstance().load(); Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; TEST_F(DatabaseManagerTest, PutAndFoundMessageItemsStaticDataIsSame) { const database::MessageItem item( "bc0c1aa2-bf09-11ec-9d64-0242ac120002", "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", "web:JouLWf84zqRIsjBdHLOcHS9M4eSCz7VF84wT1uOD83u1qxDAqmqI4swmxNINjuhd", "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe", "7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8"); const size_t currentTimestamp = tools::getCurrentTimestamp(); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findMessageItem( item.getMessageID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getFromDeviceID(), foundItem->getFromDeviceID()); EXPECT_EQ(item.getToDeviceID(), foundItem->getToDeviceID()); EXPECT_EQ(item.getPayload(), foundItem->getPayload()); EXPECT_EQ(item.getBlobHashes(), foundItem->getBlobHashes()); EXPECT_EQ( (foundItem->getExpire() >= static_cast(std::time(0))) && (foundItem->getExpire() <= static_cast(std::time(0) + MESSAGE_RECORD_TTL)), true); EXPECT_EQ( foundItem->getCreatedAt() >= currentTimestamp && foundItem->getCreatedAt() <= tools::getCurrentTimestamp(), true); database::DatabaseManager::getInstance().removeMessageItem( item.getMessageID()); } TEST_F(DatabaseManagerTest, PutAndFoundMessageItemsGeneratedDataIsSame) { const database::MessageItem item( - generateUUID(), + tools::generateUUID(), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), tools::generateRandomString(256), tools::generateRandomString(256)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findMessageItem( item.getMessageID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getFromDeviceID(), foundItem->getFromDeviceID()) << "Generated FromDeviceID \"" << item.getFromDeviceID() << "\" differs from what is found in the database " << foundItem->getFromDeviceID(); EXPECT_EQ(item.getToDeviceID(), foundItem->getToDeviceID()) << "Generated ToDeviceID \"" << item.getToDeviceID() << "\" differs from what is found in the database " << foundItem->getToDeviceID(); EXPECT_EQ(item.getPayload(), foundItem->getPayload()) << "Generated Payload \"" << item.getPayload() << "\" differs from what is found in the database " << foundItem->getPayload(); EXPECT_EQ(item.getBlobHashes(), foundItem->getBlobHashes()) << "Generated BlobHashes \"" << item.getBlobHashes() << "\" differs from what is found in the database " << foundItem->getBlobHashes(); database::DatabaseManager::getInstance().removeMessageItem( item.getMessageID()); } TEST_F(DatabaseManagerTest, PutAndFoundDeviceSessionItemStaticDataIsSame) { const database::DeviceSessionItem item( "bc0c1aa2-bf09-11ec-9d64-0242ac120002", "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC9Q9wodsQdZNynbTnC35hA4mFW" "mwZf9BhbI93aGAwPF9au0eYsawRz0jtYi4lSFXC9KleyQDg+6J+UW1kiWvE3ZRYG" "ECqgx4zqajPTzVt7EAOGaIh/dPyQ6x2Ul1GlkkSYXUhhixEzExGp9g84eCyVkbCB" "U3SK6SNKyR7anAXDVQIDAQAB", "hbI93aGAwPF9au0eYsawRz0jtYi4lSFXC9KleyQDg+6J+UW1kiWvE3", "phone", "ios:1.1.1", "iOS 99.99.99"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionItem( item.getSessionID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getDeviceID(), foundItem->getDeviceID()); EXPECT_EQ(item.getPubKey(), foundItem->getPubKey()); EXPECT_EQ(item.getNotifyToken(), foundItem->getNotifyToken()); EXPECT_EQ(item.getDeviceType(), foundItem->getDeviceType()); EXPECT_EQ(item.getAppVersion(), foundItem->getAppVersion()); EXPECT_EQ(item.getDeviceOs(), foundItem->getDeviceOs()); database::DatabaseManager::getInstance().removeSessionItem( item.getSessionID()); } TEST_F(DatabaseManagerTest, PutAndFoundDeviceSessionItemGeneratedDataIsSame) { const database::DeviceSessionItem item( tools::generateUUID(), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), tools::generateRandomString(451), tools::generateRandomString(64), tools::generateRandomString(12), tools::generateRandomString(12), tools::generateRandomString(12)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionItem( item.getSessionID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getDeviceID(), foundItem->getDeviceID()) << "Generated DeviceID \"" << item.getDeviceID() << "\" differs from what is found in the database " << foundItem->getDeviceID(); EXPECT_EQ(item.getPubKey(), foundItem->getPubKey()) << "Generated PubKey \"" << item.getPubKey() << "\" differs from what is found in the database " << foundItem->getPubKey(); EXPECT_EQ(item.getNotifyToken(), foundItem->getNotifyToken()) << "Generated NotifyToken \"" << item.getNotifyToken() << "\" differs from what is found in the database " << foundItem->getNotifyToken(); EXPECT_EQ(item.getDeviceType(), foundItem->getDeviceType()) << "Generated DeviceType \"" << item.getDeviceType() << "\" differs from what is found in the database " << foundItem->getDeviceType(); EXPECT_EQ(item.getAppVersion(), foundItem->getAppVersion()) << "Generated AppVersion \"" << item.getAppVersion() << "\" differs from what is found in the database " << foundItem->getAppVersion(); EXPECT_EQ(item.getDeviceOs(), foundItem->getDeviceOs()) << "Generated DeviceOS \"" << item.getDeviceOs() << "\" differs from what is found in the database " << foundItem->getDeviceOs(); database::DatabaseManager::getInstance().removeSessionItem( item.getSessionID()); } TEST_F(DatabaseManagerTest, PutAndFoundSessionSignItemStaticDataIsSame) { const database::SessionSignItem item( "bB3OSLdKlY60KPBpw6VoGKX7Lmw3SA07FmNhnqnclvVeaxXueAQ0dpQSpiQTtlGn", "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionSignItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionSignItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getSign(), foundItem->getSign()); database::DatabaseManager::getInstance().removeSessionSignItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundSessionSignItemGeneratedDataIsSame) { const database::SessionSignItem item( tools::generateRandomString(SIGNATURE_REQUEST_LENGTH), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionSignItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionSignItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr) << "Item with the key of deviceID \"" << item.getDeviceID() << "\" is not found"; EXPECT_EQ(item.getSign(), foundItem->getSign()) << "Generated signature value \"" << item.getSign() << "\" is not equal of \"" + foundItem->getSign() + "\" from the database value"; database::DatabaseManager::getInstance().removeSessionSignItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundPublicKeyItemsStaticDataIsSame) { const database::PublicKeyItem item( "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC9Q9wodsQdZNynbTnC35hA4mFW" "mwZf9BhbI93aGAwPF9au0eYsawRz0jtYi4lSFXC9KleyQDg+6J+UW1kiWvE3ZRYG" "ECqgx4zqajPTzVt7EAOGaIh/dPyQ6x2Ul1GlkkSYXUhhixEzExGp9g84eCyVkbCB" "U3SK6SNKyR7anAXDVQIDAQAB"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putPublicKeyItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findPublicKeyItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getPublicKey(), foundItem->getPublicKey()); database::DatabaseManager::getInstance().removePublicKeyItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundPublicKeyItemsGeneratedDataIsSame) { const database::PublicKeyItem item( "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), tools::generateRandomString(451)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putPublicKeyItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findPublicKeyItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getPublicKey(), foundItem->getPublicKey()) << "Generated PublicKey \"" << item.getPublicKey() << "\" differs from what is found in the database " << foundItem->getPublicKey(); database::DatabaseManager::getInstance().removePublicKeyItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundByReceiverMessageItemsDataIsSame) { const std::string receiverID = "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const database::MessageItem item( "bc0c1aa2-bf09-11ec-9d64-0242ac120002", "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", receiverID, "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe", "7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(item); std::vector> foundItems = database::DatabaseManager::getInstance().findMessageItemsByReceiver( receiverID); EXPECT_NE(foundItems.size(), 0); EXPECT_EQ(item.getFromDeviceID(), foundItems[0]->getFromDeviceID()); EXPECT_EQ(item.getToDeviceID(), foundItems[0]->getToDeviceID()); EXPECT_EQ(item.getPayload(), foundItems[0]->getPayload()); EXPECT_EQ(item.getBlobHashes(), foundItems[0]->getBlobHashes()); EXPECT_EQ( (foundItems[0]->getExpire() >= static_cast(std::time(0))) && (foundItems[0]->getExpire() <= static_cast(std::time(0) + MESSAGE_RECORD_TTL)), true); database::DatabaseManager::getInstance().removeMessageItem( item.getMessageID()); } diff --git a/services/tunnelbroker/test/DeliveryBrokerTest.cpp b/services/tunnelbroker/test/DeliveryBrokerTest.cpp index 2b060d2dd..3de5711c6 100644 --- a/services/tunnelbroker/test/DeliveryBrokerTest.cpp +++ b/services/tunnelbroker/test/DeliveryBrokerTest.cpp @@ -1,134 +1,134 @@ #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class DeliveryBrokerTest : public testing::Test {}; TEST(DeliveryBrokerTest, CheckPushAndPopOnStaticValues) { const std::string toDeviceID = "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const DeliveryBrokerMessage message{ .messageID = "bc0c1aa2-bf09-11ec-9d64-0242ac120002", .deliveryTag = 99, .fromDeviceID = "mobile:" "uTfNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdooLkRt0S6TLzZWNM6XSvdWLop", .payload = "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe"}; DeliveryBroker::getInstance().push( message.messageID, message.deliveryTag, toDeviceID, message.fromDeviceID, message.payload); DeliveryBrokerMessage receivedMessage = DeliveryBroker::getInstance().pop(toDeviceID); EXPECT_EQ(message.messageID, receivedMessage.messageID); EXPECT_EQ(message.deliveryTag, receivedMessage.deliveryTag); EXPECT_EQ(message.fromDeviceID, receivedMessage.fromDeviceID); EXPECT_EQ(message.payload, receivedMessage.payload); } TEST(DeliveryBrokerTest, CheckPushAndPopOnGeneratedValues) { const std::string toDeviceID = "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); const DeliveryBrokerMessage message{ - .messageID = generateUUID(), + .messageID = tools::generateUUID(), .deliveryTag = static_cast(std::time(0)), .fromDeviceID = "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), .payload = tools::generateRandomString(512)}; DeliveryBroker::getInstance().push( message.messageID, message.deliveryTag, toDeviceID, message.fromDeviceID, message.payload); DeliveryBrokerMessage receivedMessage = DeliveryBroker::getInstance().pop(toDeviceID); EXPECT_EQ(message.messageID, receivedMessage.messageID) << "Generated MessageID \"" << message.messageID << "\" differs from what was received " << receivedMessage.messageID; EXPECT_EQ(message.deliveryTag, receivedMessage.deliveryTag) << "Generated DeliveryTag \"" << message.deliveryTag << "\" differs from what was received " << receivedMessage.deliveryTag; EXPECT_EQ(message.fromDeviceID, receivedMessage.fromDeviceID) << "Generated FromDeviceID \"" << message.fromDeviceID << "\" differs from what was received " << receivedMessage.fromDeviceID; EXPECT_EQ(message.payload, receivedMessage.payload) << "Generated Payload \"" << message.payload << "\" differs from what was received " << receivedMessage.payload; } TEST(DeliveryBrokerTest, IsEmptyShoudBeFalseAfterPush) { const std::string deviceID = "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const DeliveryBrokerMessage message{ .messageID = "bc0c1aa2-bf09-11ec-9d64-0242ac120002", .deliveryTag = 99, .fromDeviceID = "mobile:" "uTfNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdooLkRt0S6TLzZWNM6XSvdWLop", .payload = "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe"}; EXPECT_EQ(DeliveryBroker::getInstance().isEmpty(deviceID), true); DeliveryBroker::getInstance().push( message.messageID, message.deliveryTag, deviceID, message.fromDeviceID, message.payload); EXPECT_EQ(DeliveryBroker::getInstance().isEmpty(deviceID), false); } TEST(DeliveryBrokerTest, ShouldBeEmptyAfterErase) { const std::string deviceID = "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const DeliveryBrokerMessage message{ .messageID = "bc0c1aa2-bf09-11ec-9d64-0242ac120002", .deliveryTag = 99, .fromDeviceID = "mobile:" "uTfNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdooLkRt0S6TLzZWNM6XSvdWLop", .payload = "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe"}; DeliveryBroker::getInstance().push( message.messageID, message.deliveryTag, deviceID, message.fromDeviceID, message.payload); EXPECT_EQ(DeliveryBroker::getInstance().isEmpty(deviceID), false); DeliveryBroker::getInstance().erase(deviceID); EXPECT_EQ(DeliveryBroker::getInstance().isEmpty(deviceID), true); } diff --git a/services/tunnelbroker/test/ToolsTest.cpp b/services/tunnelbroker/test/ToolsTest.cpp index 7539808b4..c38d19419 100644 --- a/services/tunnelbroker/test/ToolsTest.cpp +++ b/services/tunnelbroker/test/ToolsTest.cpp @@ -1,73 +1,73 @@ #include "Tools.h" #include "Constants.h" #include "GlobalTools.h" #include #include using namespace comm::network; class ToolsTest : public testing::Test {}; TEST(ToolsTest, GeneratedRandomStringHasValidLength) { const std::size_t length = 32; const std::string generated = tools::generateRandomString(length); EXPECT_EQ(generated.length(), length) << "Generated random string \"" << generated << "\" length " << generated.length() << " is not equal to " << length; } TEST(ToolsTest, ValidateDeviceIDReturnsTrueOnStaticValidDeviceID) { const std::string validDeviceID = "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; EXPECT_EQ(tools::validateDeviceID(validDeviceID), true) << "Valid deviceID \"" << validDeviceID << "\" is invalid by the function"; } TEST(ToolsTest, ValidateDeviceIDReturnsTrueOnGeneratedValidDeviceID) { const std::string validDeviceID = "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); EXPECT_EQ(tools::validateDeviceID(validDeviceID), true) << "Valid generated deviceID \"" << validDeviceID << "\" is invalid by the function"; } TEST(ToolsTest, ValidateDeviceIDReturnsFalseOnInvalidDeviceIDPrefix) { const std::string invalidDeviceIDPrefix = "invalid-" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; EXPECT_EQ(tools::validateDeviceID(invalidDeviceIDPrefix), false) << "Invalid prefix deviceID \"" << invalidDeviceIDPrefix << "\" is valid by the function"; } TEST(ToolsTest, ValidateDeviceIDReturnsFalseOnInvalidDeviceIDSuffix) { const std::string invalidDeviceIDSuffix = "mobile:tQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; EXPECT_EQ(tools::validateDeviceID(invalidDeviceIDSuffix), false) << "Invalid suffix deviceID \"" << invalidDeviceIDSuffix << "\" is valid by the function"; } TEST(ToolsTest, ValidateSessionIDReturnsTrueOnValidStaticSessionID) { const std::string validSessionID = "bc0c1aa2-bf09-11ec-9d64-0242ac120002"; EXPECT_EQ(tools::validateSessionID(validSessionID), true) << "Valid sessionID \"" << validSessionID << "\" is invalid by the function"; } TEST(ToolsTest, ValidateSessionIDReturnsTrueOnValidGeneratedSessionID) { - const std::string validSessionID = generateUUID(); + const std::string validSessionID = tools::generateUUID(); EXPECT_EQ(tools::validateSessionID(validSessionID), true) << "Valid generated sessionID \"" << validSessionID << "\" is invalid by the function"; } TEST(ToolsTest, ValidateSessionIDReturnsFalseOnInvalidSessionID) { const std::string invalidSessionID = "bc0c1aa29bf09-11ec-9d64-0242ac120002"; EXPECT_EQ(tools::validateSessionID(invalidSessionID), false) << "Invalid sessionID \"" << invalidSessionID << "\" is valid by the function"; }