diff --git a/services/tunnelbroker/src/database/message.rs b/services/tunnelbroker/src/database/message.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/database/message.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; + +use aws_sdk_dynamodb::types::AttributeValue; + +use crate::constants::dynamodb::undelivered_messages::{ + CREATED_AT, DEVICE_ID, PAYLOAD, +}; + +#[derive(Debug)] +pub struct DeviceMessage { + pub device_id: String, + pub created_at: String, + pub payload: String, +} + +#[derive(Debug, derive_more::Display)] +pub enum MessageErrors { + SerializationError, +} + +impl DeviceMessage { + pub fn from_hashmap( + hashmap: HashMap, + ) -> Result { + let device_id: String = hashmap + .get(DEVICE_ID) + .ok_or(MessageErrors::SerializationError)? + .as_s() + .map_err(|_| MessageErrors::SerializationError)? + .to_string(); + let created_at: String = hashmap + .get(CREATED_AT) + .ok_or(MessageErrors::SerializationError)? + .as_n() + .map_err(|_| MessageErrors::SerializationError)? + .to_string(); + let payload: String = hashmap + .get(PAYLOAD) + .ok_or(MessageErrors::SerializationError)? + .as_s() + .map_err(|_| MessageErrors::SerializationError)? + .to_string(); + + Ok(DeviceMessage { + device_id, + created_at, + payload, + }) + } +} diff --git a/services/tunnelbroker/src/database.rs b/services/tunnelbroker/src/database/mod.rs rename from services/tunnelbroker/src/database.rs rename to services/tunnelbroker/src/database/mod.rs --- a/services/tunnelbroker/src/database.rs +++ b/services/tunnelbroker/src/database/mod.rs @@ -15,6 +15,9 @@ PARTITION_KEY, PAYLOAD, SORT_KEY, TABLE_NAME, }; +pub mod message; +pub use message::*; + #[derive(Clone)] pub struct DatabaseClient { client: Arc, diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -7,8 +7,8 @@ use tunnelbroker_messages::{session::DeviceTypes, Messages}; use crate::{ - constants::dynamodb::undelivered_messages::CREATED_AT, - database::DatabaseClient, ACTIVE_CONNECTIONS, + database::{self, DatabaseClient, DeviceMessage}, + ACTIVE_CONNECTIONS, }; pub struct DeviceInfo { @@ -29,6 +29,7 @@ pub enum SessionError { InvalidMessage, SerializationError(serde_json::Error), + MessageError(database::MessageErrors), } fn consume_error(result: Result) { @@ -104,16 +105,15 @@ ACTIVE_CONNECTIONS.insert(device_info.device_id.clone(), tx.clone()); for message in messages { - let payload = - message.get("payload").unwrap().as_s().unwrap().to_string(); - let created_at = - message.get(CREATED_AT).unwrap().as_n().unwrap().to_string(); - self.send_message_to_device(payload).await; - self + let device_message = DeviceMessage::from_hashmap(message)?; + self.send_message_to_device(device_message.payload).await; + if let Err(e) = self .db_client - .delete_message(&device_info.device_id, &created_at) + .delete_message(&device_info.device_id, &device_message.created_at) .await - .expect("Failed to delete messages"); + { + error!("Failed to delete message: {}:", e); + } } debug!("Flushed messages for device: {}", &session_info.device_id);