diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs index 557728d5e..8bab5fc70 100644 --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -1,38 +1,52 @@ use tokio::time::Duration; pub const GRPC_TX_QUEUE_SIZE: usize = 32; pub const GRPC_SERVER_PORT: u16 = 50051; pub const GRPC_KEEP_ALIVE_PING_INTERVAL: Duration = Duration::from_secs(3); pub const GRPC_KEEP_ALIVE_PING_TIMEOUT: Duration = Duration::from_secs(10); pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); pub const MAX_RMQ_MSG_PRIORITY: u8 = 10; pub const DDB_RMQ_MSG_PRIORITY: u8 = 10; pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1; pub const RMQ_CONSUMER_TAG: &str = "tunnelbroker"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; pub mod dynamodb { // This table holds messages which could not be immediately delivered to // a device. // // - (primary key) = (deviceID: Partition Key, createdAt: Sort Key) // - deviceID: The public key of a device's olm identity key // - payload: Message to be delivered. See shared/tunnelbroker_messages. // - messageID = [createdAt]#[clientMessageID] // - createdAd: UNIX timestamp of when the item was inserted. // Timestamp is needed to order the messages correctly to the device. // Timestamp format is ISO 8601 to handle lexicographical sorting. // - clientMessageID: Message ID generated on client using UUID Version 4. pub mod undelivered_messages { pub const TABLE_NAME: &str = "tunnelbroker-undelivered-messages"; pub const PARTITION_KEY: &str = "deviceID"; pub const DEVICE_ID: &str = "deviceID"; pub const PAYLOAD: &str = "payload"; pub const MESSAGE_ID: &str = "messageID"; pub const SORT_KEY: &str = "messageID"; } + + // This table holds a device token associated with a device. + // + // - (primary key) = (deviceID: Partition Key) + // - deviceID: The public key of a device's olm identity key + // - deviceToken: Token to push services uploaded by device. + pub mod device_tokens { + pub const TABLE_NAME: &str = "tunnelbroker-device-tokens"; + pub const PARTITION_KEY: &str = "deviceID"; + pub const DEVICE_ID: &str = "deviceID"; + pub const DEVICE_TOKEN: &str = "deviceToken"; + + pub const DEVICE_TOKEN_INDEX_NAME: &str = "deviceToken-index"; + } } diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs index 9447869ba..9aa2084ad 100644 --- a/services/tunnelbroker/src/database/mod.rs +++ b/services/tunnelbroker/src/database/mod.rs @@ -1,129 +1,130 @@ use comm_lib::aws::ddb::error::SdkError; use comm_lib::aws::ddb::operation::delete_item::{ DeleteItemError, DeleteItemOutput, }; use comm_lib::aws::ddb::operation::put_item::PutItemError; use comm_lib::aws::ddb::operation::query::QueryError; use comm_lib::aws::ddb::types::AttributeValue; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::AttributeMap; use std::collections::HashMap; use std::sync::Arc; use tracing::{debug, error}; -use crate::constants::dynamodb::undelivered_messages::{ - PARTITION_KEY, PAYLOAD, SORT_KEY, TABLE_NAME, -}; +use crate::constants::dynamodb::undelivered_messages; pub mod message; pub mod message_id; use crate::database::message_id::MessageID; pub use message::*; #[derive(Clone)] pub struct DatabaseClient { client: Arc, } pub fn handle_ddb_error(db_error: SdkError) -> tonic::Status { match db_error { SdkError::TimeoutError(_) | SdkError::ServiceError(_) => { tonic::Status::unavailable("please retry") } e => { error!("Encountered an unexpected error: {}", e); tonic::Status::failed_precondition("unexpected error") } } } impl DatabaseClient { pub fn new(aws_config: &AwsConfig) -> Self { let client = DynamoDBClient::new(aws_config); DatabaseClient { client: Arc::new(client), } } pub async fn persist_message( &self, device_id: &str, payload: &str, client_message_id: &str, ) -> Result> { let message_id: String = MessageID::new(client_message_id.to_string()).into(); let device_av = AttributeValue::S(device_id.to_string()); let payload_av = AttributeValue::S(payload.to_string()); let message_id_av = AttributeValue::S(message_id.clone()); let request = self .client .put_item() - .table_name(TABLE_NAME) - .item(PARTITION_KEY, device_av) - .item(SORT_KEY, message_id_av) - .item(PAYLOAD, payload_av); + .table_name(undelivered_messages::TABLE_NAME) + .item(undelivered_messages::PARTITION_KEY, device_av) + .item(undelivered_messages::SORT_KEY, message_id_av) + .item(undelivered_messages::PAYLOAD, payload_av); debug!("Persisting message to device: {}", &device_id); request.send().await?; Ok(message_id) } pub async fn retrieve_messages( &self, device_id: &str, ) -> Result, SdkError> { debug!("Retrieving messages for device: {}", device_id); let response = self .client .query() - .table_name(TABLE_NAME) - .key_condition_expression(format!("{} = :u", PARTITION_KEY)) + .table_name(undelivered_messages::TABLE_NAME) + .key_condition_expression(format!( + "{} = :u", + undelivered_messages::PARTITION_KEY + )) .expression_attribute_values( ":u", AttributeValue::S(device_id.to_string()), ) .consistent_read(true) .send() .await?; debug!("Retrieved {} messages for {}", response.count, device_id); match response.items { None => Ok(Vec::new()), Some(items) => Ok(items.to_vec()), } } pub async fn delete_message( &self, device_id: &str, message_id: &str, ) -> Result> { debug!("Deleting message for device: {}", device_id); let key = HashMap::from([ ( - PARTITION_KEY.to_string(), + undelivered_messages::PARTITION_KEY.to_string(), AttributeValue::S(device_id.to_string()), ), ( - SORT_KEY.to_string(), + undelivered_messages::SORT_KEY.to_string(), AttributeValue::S(message_id.to_string()), ), ]); self .client .delete_item() - .table_name(TABLE_NAME) + .table_name(undelivered_messages::TABLE_NAME) .set_key(Some(key)) .send() .await } }