diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp index 38349e472..0bdcbd30a 100644 --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -1,265 +1,272 @@ #include "DatabaseManager.h" #include "DynamoDBTools.h" #include "GlobalTools.h" namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } bool DatabaseManager::isTableAvailable(const std::string &tableName) { Aws::DynamoDB::Model::DescribeTableRequest request; request.SetTableName(tableName); // Check table availability by invoking DescribeTable const Aws::DynamoDB::Model::DescribeTableOutcome &result = getDynamoDBClient()->DescribeTable(request); return result.IsSuccess(); } void DatabaseManager::putSessionItem(const DeviceSessionItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(item.getSessionID())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( DeviceSessionItem::FIELD_PUBKEY, Aws::DynamoDB::Model::AttributeValue(item.getPubKey())); request.AddItem( DeviceSessionItem::FIELD_NOTIFY_TOKEN, Aws::DynamoDB::Model::AttributeValue(item.getNotifyToken())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_TYPE, Aws::DynamoDB::Model::AttributeValue(item.getDeviceType())); request.AddItem( DeviceSessionItem::FIELD_APP_VERSION, Aws::DynamoDB::Model::AttributeValue(item.getAppVersion())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_OS, Aws::DynamoDB::Model::AttributeValue(item.getDeviceOs())); request.AddItem( DeviceSessionItem::FIELD_CHECKPOINT_TIME, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getCheckpointTime()))); request.AddItem( DeviceSessionItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionItem(const std::string &sessionID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(sessionID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionItem(const std::string &sessionID) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putSessionSignItem(const SessionSignItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( SessionSignItem::FIELD_SESSION_VERIFICATION, Aws::DynamoDB::Model::AttributeValue(item.getSign())); request.AddItem( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( SessionSignItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_SIGN_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionSignItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionSignItem(const std::string &deviceID) { std::shared_ptr item = this->findSessionSignItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putPublicKeyItem(const PublicKeyItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( PublicKeyItem::FIELD_PUBLIC_KEY, Aws::DynamoDB::Model::AttributeValue(item.getPublicKey())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findPublicKeyItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removePublicKeyItem(const std::string &deviceID) { std::shared_ptr item = this->findPublicKeyItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } template T DatabaseManager::populatePutRequestFromMessageItem( T &putRequest, const MessageItem &item) { putRequest.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); putRequest.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); putRequest.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); putRequest.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); putRequest.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); putRequest.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); putRequest.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); return putRequest; } void DatabaseManager::putMessageItem(const MessageItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request = this->populatePutRequestFromMessageItem(request, item); request.SetTableName(item.getTableName()); this->innerPutItem(std::make_shared(item), request); } void DatabaseManager::putMessageItemsByBatch( std::vector &messageItems) { std::vector writeRequests; for (MessageItem &messageItem : messageItems) { Aws::DynamoDB::Model::PutRequest putRequest; putRequest = this->populatePutRequestFromMessageItem(putRequest, messageItem); Aws::DynamoDB::Model::WriteRequest writeRequest; writeRequest.SetPutRequest(putRequest); writeRequests.push_back(writeRequest); } this->innerBatchWriteItem( MESSAGES_TABLE_NAME, DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } -std::shared_ptr -DatabaseManager::findMessageItem(const std::string &messageID) { +std::shared_ptr DatabaseManager::findMessageItem( + const std::string &toDeviceID, + const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request; + request.AddKey( + MessageItem::FIELD_TO_DEVICE_ID, + Aws::DynamoDB::Model::AttributeValue(toDeviceID)); request.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); return std::move(this->innerFindItem(request)); } std::vector> DatabaseManager::findMessageItemsByReceiver(const std::string &toDeviceID) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(MessageItem().getTableName()); req.SetKeyConditionExpression( MessageItem::FIELD_TO_DEVICE_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", toDeviceID); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName(MessageItem::INDEX_TO_DEVICE_ID); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } -void DatabaseManager::removeMessageItem(const std::string &messageID) { - std::shared_ptr item = this->findMessageItem(messageID); +void DatabaseManager::removeMessageItem( + const std::string &toDeviceID, + const std::string &messageID) { + std::shared_ptr item = + this->findMessageItem(toDeviceID, messageID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::removeMessageItemsByIDsForDeviceID( std::vector &messageIDs, const std::string &toDeviceID) { std::vector writeRequests; for (std::string &messageID : messageIDs) { Aws::DynamoDB::Model::DeleteRequest deleteRequest; deleteRequest.AddKey( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(toDeviceID)); deleteRequest.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); Aws::DynamoDB::Model::WriteRequest currentWriteRequest; currentWriteRequest.SetDeleteRequest(deleteRequest); writeRequests.push_back(currentWriteRequest); } this->innerBatchWriteItem( MESSAGES_TABLE_NAME, DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DatabaseManager.h b/services/tunnelbroker/src/Database/DatabaseManager.h index 354c60de9..31728f91d 100644 --- a/services/tunnelbroker/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/Database/DatabaseManager.h @@ -1,62 +1,65 @@ #pragma once #include "AwsTools.h" #include "Constants.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManagerBase.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include #include #include namespace comm { namespace network { namespace database { class DatabaseManager : public DatabaseManagerBase { private: template T populatePutRequestFromMessageItem(T &putRequest, const MessageItem &item); public: static DatabaseManager &getInstance(); bool isTableAvailable(const std::string &tableName); void putSessionItem(const DeviceSessionItem &item); std::shared_ptr findSessionItem(const std::string &deviceID); void removeSessionItem(const std::string &sessionID); void putSessionSignItem(const SessionSignItem &item); std::shared_ptr findSessionSignItem(const std::string &deviceID); void removeSessionSignItem(const std::string &deviceID); void putPublicKeyItem(const PublicKeyItem &item); std::shared_ptr findPublicKeyItem(const std::string &deviceID); void removePublicKeyItem(const std::string &deviceID); void putMessageItem(const MessageItem &item); void putMessageItemsByBatch(std::vector &messageItems); - std::shared_ptr findMessageItem(const std::string &messageID); + std::shared_ptr + findMessageItem(const std::string &toDeviceID, const std::string &messageID); std::vector> findMessageItemsByReceiver(const std::string &toDeviceID); - void removeMessageItem(const std::string &messageID); + void removeMessageItem( + const std::string &toDeviceID, + const std::string &messageID); void removeMessageItemsByIDsForDeviceID( std::vector &messageIDs, const std::string &toDeviceID); }; } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp index bc6a3989e..0923198a9 100644 --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,255 +1,255 @@ #include "TunnelbrokerServiceImpl.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = tools::generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << deviceID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); const std::string newSessionID = tools::generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { LOG(INFO) << "gRPC: " << "Session sign request not found for deviceID: " << deviceID; return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { LOG(INFO) << "gRPC: " << "The public key doesn't match for deviceID"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { LOG(INFO) << "gRPC: " << "Signature for the verification message is not valid"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'NewSession' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { LOG(INFO) << "gRPC: " << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); const database::MessageItem message( messageID, clientDeviceID, request->todeviceid(), request->payload(), ""); database::DatabaseManager::getInstance().putMessageItem(message); if (!AmqpManager::getInstance().send(&message)) { LOG(ERROR) << "gRPC: " << "Error while publish the message to AMQP"; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Send' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { LOG(INFO) << "gRPC: " << "Format validation failed for " << sessionID; return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { LOG(INFO) << "gRPC: " << "Session " << sessionID << " not found"; return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; std::vector> messagesFromDatabase = database::DatabaseManager::getInstance().findMessageItemsByReceiver( clientDeviceID); if (messagesFromDatabase.size() > 0) { // When a client connects and requests GET for the messages first we check // if there are undelivered messages in the database. If so, we are // erasing the messages to deliver from rabbitMQ which are handled by // DeliveryBroker. DeliveryBroker::getInstance().erase(clientDeviceID); } tunnelbroker::GetResponse response; auto respondToWriter = [&writer, &response](std::string fromDeviceID, std::string payload) { response.set_fromdeviceid(fromDeviceID); response.set_payload(payload); if (!writer->Write(response)) { throw std::runtime_error( "gRPC: 'Get' writer error on sending data to the client"); } response.Clear(); }; for (auto &messageFromDatabase : messagesFromDatabase) { respondToWriter( messageFromDatabase->getFromDeviceID(), messageFromDatabase->getPayload()); database::DatabaseManager::getInstance().removeMessageItem( - messageFromDatabase->getMessageID()); + clientDeviceID, messageFromDatabase->getMessageID()); } while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); respondToWriter(messageToDeliver.fromDeviceID, messageToDeliver.payload); comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); database::DatabaseManager::getInstance().removeMessageItem( - messageToDeliver.messageID); + clientDeviceID, messageToDeliver.messageID); // If messages queue for `clientDeviceID` is empty we don't need to store // `folly::MPMCQueue` for it and need to free memory to fix possible // 'ghost' queues in DeliveryBroker. // We call `deleteQueueIfEmpty()` for this purpose here. DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Get' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/test/DatabaseManagerTest.cpp b/services/tunnelbroker/test/DatabaseManagerTest.cpp index 9fc37c8a1..68103c518 100644 --- a/services/tunnelbroker/test/DatabaseManagerTest.cpp +++ b/services/tunnelbroker/test/DatabaseManagerTest.cpp @@ -1,388 +1,388 @@ #include "DatabaseManager.h" #include "ConfigManager.h" #include "Constants.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include #include using namespace comm::network; class DatabaseManagerTest : public testing::Test { protected: virtual void SetUp() { config::ConfigManager::getInstance().load(); Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; TEST_F(DatabaseManagerTest, PutAndFoundMessageItemsStaticDataIsSame) { const database::MessageItem item( "bc0c1aa2-bf09-11ec-9d64-0242ac120002", "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", "web:JouLWf84zqRIsjBdHLOcHS9M4eSCz7VF84wT1uOD83u1qxDAqmqI4swmxNINjuhd", "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe", "7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8"); const size_t currentTimestamp = tools::getCurrentTimestamp(); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findMessageItem( - item.getMessageID()); + item.getToDeviceID(), item.getMessageID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getFromDeviceID(), foundItem->getFromDeviceID()); EXPECT_EQ(item.getToDeviceID(), foundItem->getToDeviceID()); EXPECT_EQ(item.getPayload(), foundItem->getPayload()); EXPECT_EQ(item.getBlobHashes(), foundItem->getBlobHashes()); EXPECT_EQ( (foundItem->getExpire() >= static_cast(std::time(0))) && (foundItem->getExpire() <= static_cast(std::time(0) + MESSAGE_RECORD_TTL)), true); EXPECT_EQ( foundItem->getCreatedAt() >= currentTimestamp && foundItem->getCreatedAt() <= tools::getCurrentTimestamp(), true); database::DatabaseManager::getInstance().removeMessageItem( - item.getMessageID()); + item.getToDeviceID(), item.getMessageID()); } TEST_F(DatabaseManagerTest, PutAndFoundMessageItemsGeneratedDataIsSame) { const database::MessageItem item( tools::generateUUID(), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), tools::generateRandomString(256), tools::generateRandomString(256)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findMessageItem( - item.getMessageID()); + item.getToDeviceID(), item.getMessageID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getFromDeviceID(), foundItem->getFromDeviceID()) << "Generated FromDeviceID \"" << item.getFromDeviceID() << "\" differs from what is found in the database " << foundItem->getFromDeviceID(); EXPECT_EQ(item.getToDeviceID(), foundItem->getToDeviceID()) << "Generated ToDeviceID \"" << item.getToDeviceID() << "\" differs from what is found in the database " << foundItem->getToDeviceID(); EXPECT_EQ(item.getPayload(), foundItem->getPayload()) << "Generated Payload \"" << item.getPayload() << "\" differs from what is found in the database " << foundItem->getPayload(); EXPECT_EQ(item.getBlobHashes(), foundItem->getBlobHashes()) << "Generated BlobHashes \"" << item.getBlobHashes() << "\" differs from what is found in the database " << foundItem->getBlobHashes(); database::DatabaseManager::getInstance().removeMessageItem( - item.getMessageID()); + item.getToDeviceID(), item.getMessageID()); } TEST_F(DatabaseManagerTest, BatchPutAndFoundMessagesItemsCountIsSame) { const std::string receiverID = "web:JouLWf84zqRIsjBdHLOcHS9M4eSCz7VF84wT1uOD83u1qxDAqmqI4swmxNINjuhd"; const size_t itemsSize = 29; std::vector messageItems; for (size_t i = 1; i <= itemsSize; ++i) { database::MessageItem item{ tools::generateUUID(), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), receiverID, tools::generateRandomString(256), tools::generateRandomString(256)}; messageItems.push_back(item); } EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( MESSAGES_TABLE_NAME), true); database::DatabaseManager::getInstance().putMessageItemsByBatch(messageItems); std::vector> foundItems = database::DatabaseManager::getInstance().findMessageItemsByReceiver( receiverID); EXPECT_EQ(foundItems.size(), itemsSize); for (std::shared_ptr messageItem : foundItems) { database::DatabaseManager::getInstance().removeMessageItem( - messageItem->getMessageID()); + messageItem->getToDeviceID(), messageItem->getMessageID()); } } TEST_F(DatabaseManagerTest, PutAndFoundDeviceSessionItemStaticDataIsSame) { const database::DeviceSessionItem item( "bc0c1aa2-bf09-11ec-9d64-0242ac120002", "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC9Q9wodsQdZNynbTnC35hA4mFW" "mwZf9BhbI93aGAwPF9au0eYsawRz0jtYi4lSFXC9KleyQDg+6J+UW1kiWvE3ZRYG" "ECqgx4zqajPTzVt7EAOGaIh/dPyQ6x2Ul1GlkkSYXUhhixEzExGp9g84eCyVkbCB" "U3SK6SNKyR7anAXDVQIDAQAB", "hbI93aGAwPF9au0eYsawRz0jtYi4lSFXC9KleyQDg+6J+UW1kiWvE3", "phone", "ios:1.1.1", "iOS 99.99.99"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionItem( item.getSessionID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getDeviceID(), foundItem->getDeviceID()); EXPECT_EQ(item.getPubKey(), foundItem->getPubKey()); EXPECT_EQ(item.getNotifyToken(), foundItem->getNotifyToken()); EXPECT_EQ(item.getDeviceType(), foundItem->getDeviceType()); EXPECT_EQ(item.getAppVersion(), foundItem->getAppVersion()); EXPECT_EQ(item.getDeviceOs(), foundItem->getDeviceOs()); database::DatabaseManager::getInstance().removeSessionItem( item.getSessionID()); } TEST_F(DatabaseManagerTest, PutAndFoundDeviceSessionItemGeneratedDataIsSame) { const database::DeviceSessionItem item( tools::generateUUID(), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), tools::generateRandomString(451), tools::generateRandomString(64), tools::generateRandomString(12), tools::generateRandomString(12), tools::generateRandomString(12)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionItem( item.getSessionID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getDeviceID(), foundItem->getDeviceID()) << "Generated DeviceID \"" << item.getDeviceID() << "\" differs from what is found in the database " << foundItem->getDeviceID(); EXPECT_EQ(item.getPubKey(), foundItem->getPubKey()) << "Generated PubKey \"" << item.getPubKey() << "\" differs from what is found in the database " << foundItem->getPubKey(); EXPECT_EQ(item.getNotifyToken(), foundItem->getNotifyToken()) << "Generated NotifyToken \"" << item.getNotifyToken() << "\" differs from what is found in the database " << foundItem->getNotifyToken(); EXPECT_EQ(item.getDeviceType(), foundItem->getDeviceType()) << "Generated DeviceType \"" << item.getDeviceType() << "\" differs from what is found in the database " << foundItem->getDeviceType(); EXPECT_EQ(item.getAppVersion(), foundItem->getAppVersion()) << "Generated AppVersion \"" << item.getAppVersion() << "\" differs from what is found in the database " << foundItem->getAppVersion(); EXPECT_EQ(item.getDeviceOs(), foundItem->getDeviceOs()) << "Generated DeviceOS \"" << item.getDeviceOs() << "\" differs from what is found in the database " << foundItem->getDeviceOs(); database::DatabaseManager::getInstance().removeSessionItem( item.getSessionID()); } TEST_F(DatabaseManagerTest, PutAndFoundSessionSignItemStaticDataIsSame) { const database::SessionSignItem item( "bB3OSLdKlY60KPBpw6VoGKX7Lmw3SA07FmNhnqnclvVeaxXueAQ0dpQSpiQTtlGn", "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionSignItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionSignItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getSign(), foundItem->getSign()); database::DatabaseManager::getInstance().removeSessionSignItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundSessionSignItemGeneratedDataIsSame) { const database::SessionSignItem item( tools::generateRandomString(SIGNATURE_REQUEST_LENGTH), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putSessionSignItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findSessionSignItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr) << "Item with the key of deviceID \"" << item.getDeviceID() << "\" is not found"; EXPECT_EQ(item.getSign(), foundItem->getSign()) << "Generated signature value \"" << item.getSign() << "\" is not equal of \"" + foundItem->getSign() + "\" from the database value"; database::DatabaseManager::getInstance().removeSessionSignItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundPublicKeyItemsStaticDataIsSame) { const database::PublicKeyItem item( "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC9Q9wodsQdZNynbTnC35hA4mFW" "mwZf9BhbI93aGAwPF9au0eYsawRz0jtYi4lSFXC9KleyQDg+6J+UW1kiWvE3ZRYG" "ECqgx4zqajPTzVt7EAOGaIh/dPyQ6x2Ul1GlkkSYXUhhixEzExGp9g84eCyVkbCB" "U3SK6SNKyR7anAXDVQIDAQAB"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putPublicKeyItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findPublicKeyItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getPublicKey(), foundItem->getPublicKey()); database::DatabaseManager::getInstance().removePublicKeyItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundPublicKeyItemsGeneratedDataIsSame) { const database::PublicKeyItem item( "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), tools::generateRandomString(451)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putPublicKeyItem(item); std::shared_ptr foundItem = database::DatabaseManager::getInstance().findPublicKeyItem( item.getDeviceID()); EXPECT_NE(foundItem, nullptr); EXPECT_EQ(item.getPublicKey(), foundItem->getPublicKey()) << "Generated PublicKey \"" << item.getPublicKey() << "\" differs from what is found in the database " << foundItem->getPublicKey(); database::DatabaseManager::getInstance().removePublicKeyItem( item.getDeviceID()); } TEST_F(DatabaseManagerTest, PutAndFoundByReceiverMessageItemsDataIsSame) { const std::string receiverID = "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const database::MessageItem item( "bc0c1aa2-bf09-11ec-9d64-0242ac120002", "mobile:" "EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm", receiverID, "lYlNcO6RR4i9UW3G1DGjdJTRRGbqtPya2aj94ZRjIGZWoHwT5MB9ciAgnQf2VafYb9Tl" "8SZkX37tg4yZ9pOb4lqslY4g4h58OmWjumghVRvrPUZDalUuK8OLs1Qoengpu9wccxAk" "Bti2leDTNeiJDy36NnwS9aCIUc0ozsMvXfX1gWdBdmKbiRG1LvpNd6S7BNGG7Zly5zYj" "xz7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8G5" "RJ1NEDQq2q2FRfWjNHLhky92C2C7Nnfe4oVzSinfC1319uUkNLpSzI4MvEMi6g5Ukbl7" "iGhpnX7Hp4xpBL3h2IkvGviDRQ98UvW0ugwUuPxm1NOQpjLG5dPoqQ0jrMst0Bl5rgPw" "ajjNGsUWmp9r0ST0wRQXrQcY30PoSoqKSlCEgFMLzHWLrPQ86QFyCICismGSe7iBIqdD" "6d37StvXBzfJoZVU79UeOF2bFvb3DNoArEOe", "7s6ZUSDoFfZe3eJWQ15ngYhgMw1TsfbECnMVQTYvY6OyqWPBQi5wiftFcluoxor8"); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( item.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(item); std::vector> foundItems = database::DatabaseManager::getInstance().findMessageItemsByReceiver( receiverID); EXPECT_NE(foundItems.size(), 0); EXPECT_EQ(item.getFromDeviceID(), foundItems[0]->getFromDeviceID()); EXPECT_EQ(item.getToDeviceID(), foundItems[0]->getToDeviceID()); EXPECT_EQ(item.getPayload(), foundItems[0]->getPayload()); EXPECT_EQ(item.getBlobHashes(), foundItems[0]->getBlobHashes()); EXPECT_EQ( (foundItems[0]->getExpire() >= static_cast(std::time(0))) && (foundItems[0]->getExpire() <= static_cast(std::time(0) + MESSAGE_RECORD_TTL)), true); database::DatabaseManager::getInstance().removeMessageItem( - item.getMessageID()); + item.getToDeviceID(), item.getMessageID()); } TEST_F(DatabaseManagerTest, RemoveMessageItemsInBatch) { const size_t randomStringSize = 256; const std::string receiverID = "mobile:EMQNoQ7b2ueEmQ4QsevRWlXxFCNt055y20T1PHdoYAQRt0S6TLzZWNM6XSvdWqxm"; const database::MessageItem messageFirstToRemove( tools::generateUUID(), "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), receiverID, tools::generateRandomString(randomStringSize), tools::generateRandomString(randomStringSize)); const database::MessageItem messageSecondToRemove( tools::generateUUID(), "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), receiverID, tools::generateRandomString(randomStringSize), tools::generateRandomString(randomStringSize)); const database::MessageItem messageThirdToNotRemove( tools::generateUUID(), "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH), receiverID, tools::generateRandomString(randomStringSize), tools::generateRandomString(randomStringSize)); EXPECT_EQ( database::DatabaseManager::getInstance().isTableAvailable( messageFirstToRemove.getTableName()), true); database::DatabaseManager::getInstance().putMessageItem(messageFirstToRemove); database::DatabaseManager::getInstance().putMessageItem( messageSecondToRemove); database::DatabaseManager::getInstance().putMessageItem( messageThirdToNotRemove); std::vector> foundItems = database::DatabaseManager::getInstance().findMessageItemsByReceiver( receiverID); EXPECT_EQ(foundItems.size(), 3) << "Items count found by receiverID after insert is not equal to 3"; std::vector messageIDs = { messageFirstToRemove.getMessageID(), messageSecondToRemove.getMessageID()}; database::DatabaseManager::getInstance().removeMessageItemsByIDsForDeviceID( messageIDs, receiverID); foundItems = database::DatabaseManager::getInstance().findMessageItemsByReceiver( receiverID); // `messageThirdToNotRemove` must not be removed and must be persisted EXPECT_EQ(foundItems.size(), 1) << "Items found by receiverID is not equal to 1 after calling " "`removeMessageItemsByIDsForDeviceID`. The one message must be " "persisted."; database::DatabaseManager::getInstance().removeMessageItem( messageThirdToNotRemove.getMessageID()); }