diff --git a/services/tunnelbroker/src/Database/DatabaseManager.h b/services/tunnelbroker/src/Database/DatabaseManager.h --- a/services/tunnelbroker/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/Database/DatabaseManager.h @@ -47,6 +47,9 @@ std::vector> findMessageItemsByReceiver(const std::string &toDeviceID); void removeMessageItem(const std::string &messageID); + void removeMessageItemsOlderThenDeviceCheckpoint( + const std::string &toDeviceID, + const uint64_t &checkpointTime); }; } // namespace database diff --git a/services/tunnelbroker/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/Database/DatabaseManager.cpp --- a/services/tunnelbroker/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/Database/DatabaseManager.cpp @@ -209,6 +209,30 @@ this->innerRemoveItem(*item); } +void DatabaseManager::removeMessageItemsOlderThenDeviceCheckpoint( + const std::string &toDeviceID, + const uint64_t &checkpointTime) { + std::vector> messageItems = + this->findMessageItemsByReceiver(toDeviceID); + std::vector writeRequests; + + for (std::shared_ptr &messageItem : messageItems) { + if (messageItem->getCreatedAt() <= checkpointTime) { + Aws::DynamoDB::Model::DeleteRequest deleteRequest; + PrimaryKeyDescriptor pk = messageItem->getPrimaryKeyDescriptor(); + PrimaryKeyValue primaryKeyValue = messageItem->getPrimaryKeyValue(); + deleteRequest.AddKey( + pk.partitionKey, + Aws::DynamoDB::Model::AttributeValue(primaryKeyValue.partitionKey)); + Aws::DynamoDB::Model::WriteRequest curWriteRequest; + curWriteRequest.SetDeleteRequest(deleteRequest); + writeRequests.push_back(curWriteRequest); + } + } + this->innerBatchWriteItem( + MESSAGES_TABLE_NAME, MAX_DYNAMODB_BATCH_ITEMS, writeRequests); +} + } // namespace database } // namespace network } // namespace comm