diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp index 96592a664..1824c9281 100644 --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -1,270 +1,273 @@ #include "DatabaseManager.h" #include "DynamoDBTools.h" #include "GlobalTools.h" namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } bool DatabaseManager::isTableAvailable(const std::string &tableName) { Aws::DynamoDB::Model::DescribeTableRequest request; request.SetTableName(tableName); // Check table availability by invoking DescribeTable const Aws::DynamoDB::Model::DescribeTableOutcome &result = getDynamoDBClient()->DescribeTable(request); return result.IsSuccess(); } void DatabaseManager::putSessionItem(const DeviceSessionItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(item.getSessionID())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( DeviceSessionItem::FIELD_PUBKEY, Aws::DynamoDB::Model::AttributeValue(item.getPubKey())); request.AddItem( DeviceSessionItem::FIELD_NOTIFY_TOKEN, Aws::DynamoDB::Model::AttributeValue(item.getNotifyToken())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_TYPE, Aws::DynamoDB::Model::AttributeValue(item.getDeviceType())); request.AddItem( DeviceSessionItem::FIELD_APP_VERSION, Aws::DynamoDB::Model::AttributeValue(item.getAppVersion())); request.AddItem( DeviceSessionItem::FIELD_DEVICE_OS, Aws::DynamoDB::Model::AttributeValue(item.getDeviceOs())); request.AddItem( DeviceSessionItem::FIELD_CHECKPOINT_TIME, Aws::DynamoDB::Model::AttributeValue( std::to_string(item.getCheckpointTime()))); request.AddItem( DeviceSessionItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_RECORD_TTL))); + request.AddItem( + DeviceSessionItem::FIELD_IS_ONLINE, + Aws::DynamoDB::Model::AttributeValue().SetBool(false)); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionItem(const std::string &sessionID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( DeviceSessionItem::FIELD_SESSION_ID, Aws::DynamoDB::Model::AttributeValue(sessionID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionItem(const std::string &sessionID) { std::shared_ptr item = this->findSessionItem(sessionID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putSessionSignItem(const SessionSignItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( SessionSignItem::FIELD_SESSION_VERIFICATION, Aws::DynamoDB::Model::AttributeValue(item.getSign())); request.AddItem( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( SessionSignItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0)) + SESSION_SIGN_RECORD_TTL))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findSessionSignItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( SessionSignItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeSessionSignItem(const std::string &deviceID) { std::shared_ptr item = this->findSessionSignItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putPublicKeyItem(const PublicKeyItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(item.getTableName()); request.AddItem( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getDeviceID())); request.AddItem( PublicKeyItem::FIELD_PUBLIC_KEY, Aws::DynamoDB::Model::AttributeValue(item.getPublicKey())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findPublicKeyItem(const std::string &deviceID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( PublicKeyItem::FIELD_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(deviceID)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removePublicKeyItem(const std::string &deviceID) { std::shared_ptr item = this->findPublicKeyItem(deviceID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } template T DatabaseManager::populatePutRequestFromMessageItem( T &putRequest, const MessageItem &item) { putRequest.AddItem( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(item.getMessageID())); putRequest.AddItem( MessageItem::FIELD_FROM_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getFromDeviceID())); putRequest.AddItem( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(item.getToDeviceID())); putRequest.AddItem( MessageItem::FIELD_PAYLOAD, Aws::DynamoDB::Model::AttributeValue(item.getPayload())); putRequest.AddItem( MessageItem::FIELD_BLOB_HASHES, Aws::DynamoDB::Model::AttributeValue(item.getBlobHashes())); putRequest.AddItem( MessageItem::FIELD_EXPIRE, Aws::DynamoDB::Model::AttributeValue(std::to_string( static_cast(std::time(0) + MESSAGE_RECORD_TTL)))); putRequest.AddItem( MessageItem::FIELD_CREATED_AT, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); return putRequest; } void DatabaseManager::putMessageItem(const MessageItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request = this->populatePutRequestFromMessageItem(request, item); request.SetTableName(item.getTableName()); this->innerPutItem(std::make_shared(item), request); } void DatabaseManager::putMessageItemsByBatch( std::vector &messageItems) { std::vector writeRequests; for (MessageItem &messageItem : messageItems) { Aws::DynamoDB::Model::PutRequest putRequest; putRequest = this->populatePutRequestFromMessageItem(putRequest, messageItem); Aws::DynamoDB::Model::WriteRequest writeRequest; writeRequest.SetPutRequest(putRequest); writeRequests.push_back(writeRequest); } this->innerBatchWriteItem( messageItems[0].getTableName(), DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } std::shared_ptr DatabaseManager::findMessageItem( const std::string &toDeviceID, const std::string &messageID) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(toDeviceID)); request.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); return std::move(this->innerFindItem(request)); } std::vector> DatabaseManager::findMessageItemsByReceiver(const std::string &toDeviceID) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(MessageItem().getTableName()); req.SetKeyConditionExpression( MessageItem::FIELD_TO_DEVICE_ID + " = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", toDeviceID); req.SetExpressionAttributeValues(attributeValues); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeMessageItem( const std::string &toDeviceID, const std::string &messageID) { std::shared_ptr item = this->findMessageItem(toDeviceID, messageID); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::removeMessageItemsByIDsForDeviceID( std::vector &messageIDs, const std::string &toDeviceID) { std::vector writeRequests; for (std::string &messageID : messageIDs) { Aws::DynamoDB::Model::DeleteRequest deleteRequest; deleteRequest.AddKey( MessageItem::FIELD_TO_DEVICE_ID, Aws::DynamoDB::Model::AttributeValue(toDeviceID)); deleteRequest.AddKey( MessageItem::FIELD_MESSAGE_ID, Aws::DynamoDB::Model::AttributeValue(messageID)); Aws::DynamoDB::Model::WriteRequest currentWriteRequest; currentWriteRequest.SetDeleteRequest(deleteRequest); writeRequests.push_back(currentWriteRequest); } this->innerBatchWriteItem( MessageItem().getTableName(), DYNAMODB_MAX_BATCH_ITEMS, DYNAMODB_BACKOFF_FIRST_RETRY_DELAY, DYNAMODB_MAX_BACKOFF_TIME, writeRequests); } } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DeviceSessionItem.cpp b/services/tunnelbroker/src/Database/DeviceSessionItem.cpp index 7142a37f1..76f44e93c 100644 --- a/services/tunnelbroker/src/Database/DeviceSessionItem.cpp +++ b/services/tunnelbroker/src/Database/DeviceSessionItem.cpp @@ -1,128 +1,135 @@ #include "DeviceSessionItem.h" #include "ConfigManager.h" #include "Tools.h" #include namespace comm { namespace network { namespace database { const std::string DeviceSessionItem::FIELD_SESSION_ID = "SessionID"; const std::string DeviceSessionItem::FIELD_DEVICE_ID = "DeviceID"; const std::string DeviceSessionItem::FIELD_PUBKEY = "PubKey"; const std::string DeviceSessionItem::FIELD_NOTIFY_TOKEN = "NotifyToken"; const std::string DeviceSessionItem::FIELD_DEVICE_TYPE = "DeviceType"; const std::string DeviceSessionItem::FIELD_APP_VERSION = "AppVersion"; const std::string DeviceSessionItem::FIELD_DEVICE_OS = "DeviceOS"; const std::string DeviceSessionItem::FIELD_CHECKPOINT_TIME = "CheckpointTime"; const std::string DeviceSessionItem::FIELD_EXPIRE = "Expire"; +const std::string DeviceSessionItem::FIELD_IS_ONLINE = "IsOnline"; DeviceSessionItem::DeviceSessionItem( const std::string sessionID, const std::string deviceID, const std::string pubKey, const std::string notifyToken, const std::string deviceType, const std::string appVersion, const std::string deviceOs) : sessionID(sessionID), deviceID(deviceID), pubKey(pubKey), notifyToken(notifyToken), deviceType(deviceType), appVersion(appVersion), deviceOs(deviceOs) { this->validate(); } DeviceSessionItem::DeviceSessionItem(const AttributeValues &itemFromDB) { this->assignItemFromDatabase(itemFromDB); } void DeviceSessionItem::validate() const { if (!tools::validateSessionID(this->sessionID)) { throw std::runtime_error("Error: SessionID format is wrong."); } if (!tools::validateDeviceID(this->deviceID)) { throw std::runtime_error("Error: DeviceID format is wrong."); } tools::checkIfNotEmpty("pubKey", this->pubKey); tools::checkIfNotEmpty("notifyToken", this->notifyToken); tools::checkIfNotEmpty("deviceType", this->deviceType); tools::checkIfNotEmpty("appVersion", this->appVersion); tools::checkIfNotEmpty("deviceOs", this->deviceOs); } void DeviceSessionItem::assignItemFromDatabase( const AttributeValues &itemFromDB) { try { this->sessionID = itemFromDB.at(DeviceSessionItem::FIELD_SESSION_ID).GetS(); this->deviceID = itemFromDB.at(DeviceSessionItem::FIELD_DEVICE_ID).GetS(); this->pubKey = itemFromDB.at(DeviceSessionItem::FIELD_PUBKEY).GetS(); this->notifyToken = itemFromDB.at(DeviceSessionItem::FIELD_NOTIFY_TOKEN).GetS(); this->deviceType = itemFromDB.at(DeviceSessionItem::FIELD_DEVICE_TYPE).GetS(); this->appVersion = itemFromDB.at(DeviceSessionItem::FIELD_APP_VERSION).GetS(); this->deviceOs = itemFromDB.at(DeviceSessionItem::FIELD_DEVICE_OS).GetS(); this->checkpointTime = std::stoll( std::string( itemFromDB.at(DeviceSessionItem::FIELD_CHECKPOINT_TIME).GetS()) .c_str()); + this->isOnline = + itemFromDB.at(DeviceSessionItem::FIELD_IS_ONLINE).GetBool(); } catch (std::logic_error &e) { throw std::runtime_error( "Invalid device session database value " + std::string(e.what())); } this->validate(); } std::string DeviceSessionItem::getTableName() const { return config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE); } PrimaryKeyDescriptor DeviceSessionItem::getPrimaryKeyDescriptor() const { return PrimaryKeyDescriptor(DeviceSessionItem::FIELD_SESSION_ID); } PrimaryKeyValue DeviceSessionItem::getPrimaryKeyValue() const { return PrimaryKeyValue(this->sessionID); } std::string DeviceSessionItem::getSessionID() const { return this->sessionID; } std::string DeviceSessionItem::getDeviceID() const { return this->deviceID; } std::string DeviceSessionItem::getPubKey() const { return this->pubKey; } std::string DeviceSessionItem::getNotifyToken() const { return this->notifyToken; } std::string DeviceSessionItem::getDeviceType() const { return this->deviceType; } std::string DeviceSessionItem::getAppVersion() const { return this->appVersion; } std::string DeviceSessionItem::getDeviceOs() const { return this->deviceOs; } int64_t DeviceSessionItem::getCheckpointTime() const { return this->checkpointTime; } +bool DeviceSessionItem::getIsOnline() const { + return this->isOnline; +} + } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DeviceSessionItem.h b/services/tunnelbroker/src/Database/DeviceSessionItem.h index 2d9073099..fd8975306 100644 --- a/services/tunnelbroker/src/Database/DeviceSessionItem.h +++ b/services/tunnelbroker/src/Database/DeviceSessionItem.h @@ -1,62 +1,65 @@ #pragma once #include "Item.h" #include namespace comm { namespace network { namespace database { class DeviceSessionItem : public Item { std::string sessionID; std::string deviceID; std::string pubKey; std::string notifyToken; std::string deviceType; std::string appVersion; std::string deviceOs; int64_t checkpointTime = 0; + bool isOnline = false; void validate() const override; public: static const std::string FIELD_SESSION_ID; static const std::string FIELD_DEVICE_ID; static const std::string FIELD_PUBKEY; static const std::string FIELD_NOTIFY_TOKEN; static const std::string FIELD_DEVICE_TYPE; static const std::string FIELD_APP_VERSION; static const std::string FIELD_DEVICE_OS; static const std::string FIELD_CHECKPOINT_TIME; static const std::string FIELD_EXPIRE; + static const std::string FIELD_IS_ONLINE; PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override; PrimaryKeyValue getPrimaryKeyValue() const override; std::string getTableName() const override; std::string getSessionID() const; std::string getDeviceID() const; std::string getPubKey() const; std::string getNotifyToken() const; std::string getDeviceType() const; std::string getAppVersion() const; std::string getDeviceOs() const; int64_t getCheckpointTime() const; + bool getIsOnline() const; DeviceSessionItem() { } DeviceSessionItem( const std::string sessionID, const std::string deviceID, const std::string pubKey, const std::string notifyToken, const std::string deviceType, const std::string appVersion, const std::string deviceOs); DeviceSessionItem(const AttributeValues &itemFromDB); void assignItemFromDatabase(const AttributeValues &itemFromDB) override; }; } // namespace database } // namespace network } // namespace comm