diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf --- a/services/terraform/modules/shared/dynamodb.tf +++ b/services/terraform/modules/shared/dynamodb.tf @@ -94,6 +94,11 @@ name = "messageID" type = "S" } + + ttl { + attribute_name = "expirationTimeUnix" + enabled = true + } } resource "aws_dynamodb_table" "tunnelbroker-device-tokens" { diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -19,6 +19,10 @@ tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; pub const FCM_ACCESS_TOKEN_GENERATION_THRESHOLD: u64 = 5 * 60; +/// How long messages should stay in DDB after device data deletion request +pub const MESSAGE_TTL_AFTER_DELETION_REQUEST: chrono::Duration = + chrono::Duration::hours(24); + pub mod dynamodb { // This table holds messages which could not be immediately delivered to // a device. @@ -36,6 +40,7 @@ pub const PARTITION_KEY: &str = "deviceID"; pub const DEVICE_ID: &str = "deviceID"; pub const PAYLOAD: &str = "payload"; + pub const EXPIRATION_TIME: &str = "expirationTimeUnix"; pub const MESSAGE_ID: &str = "messageID"; pub const SORT_KEY: &str = "messageID"; } diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs --- a/services/tunnelbroker/src/database/mod.rs +++ b/services/tunnelbroker/src/database/mod.rs @@ -4,7 +4,7 @@ }; use comm_lib::aws::ddb::operation::put_item::PutItemError; use comm_lib::aws::ddb::operation::query::QueryError; -use comm_lib::aws::ddb::types::AttributeValue; +use comm_lib::aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{AttributeExtractor, AttributeMap, Error}; use std::collections::HashMap; @@ -12,6 +12,7 @@ use tracing::{debug, error, warn}; use crate::constants::dynamodb::{device_tokens, undelivered_messages}; +use crate::constants::MESSAGE_TTL_AFTER_DELETION_REQUEST; pub mod message; pub mod message_id; @@ -128,6 +129,42 @@ .await } + pub async fn mark_messages_to_device_for_deletion( + &self, + device_id: &str, + ) -> Result<(), Error> { + let messages = self.retrieve_messages(device_id).await.map_err(|e| { + error!("DynamoDB client failed to retrieve messages: {:?}", e); + Error::AwsSdk(e.into()) + })?; + + let expires_at = chrono::Utc::now() + MESSAGE_TTL_AFTER_DELETION_REQUEST; + let expiration_attr = AttributeValue::N(expires_at.timestamp().to_string()); + + let update_requests = messages + .into_iter() + .map(|mut attrs| { + attrs.insert( + undelivered_messages::EXPIRATION_TIME.to_string(), + expiration_attr.clone(), + ); + let put_request = + PutRequest::builder().set_item(Some(attrs)).build().unwrap(); + WriteRequest::builder().put_request(put_request).build() + }) + .collect(); + + comm_lib::database::batch_operations::batch_write( + &self.client, + undelivered_messages::TABLE_NAME, + update_requests, + Default::default(), + ) + .await?; + + Ok(()) + } + pub async fn remove_device_token( &self, device_id: &str, diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -86,6 +86,12 @@ .await .map_err(|_| tonic::Status::failed_precondition("unexpected error"))?; + self + .client + .mark_messages_to_device_for_deletion(&message.device_id) + .await + .map_err(|_| tonic::Status::failed_precondition("unexpected error"))?; + let response = tonic::Response::new(Empty {}); Ok(response) }