diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -5,6 +5,11 @@ pub const GRPC_KEEP_ALIVE_PING_INTERVAL: Duration = Duration::from_secs(3); pub const GRPC_KEEP_ALIVE_PING_TIMEOUT: Duration = Duration::from_secs(10); +pub const MAX_RMQ_MSG_PRIORITY: u8 = 10; +pub const DDB_RMQ_MSG_PRIORITY: u8 = 10; +pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1; +pub const RMQ_CONSUMER_TAG: &str = "tunnelbroker"; + pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -10,6 +10,7 @@ use tonic::transport::Server; use tracing::debug; +use crate::constants::CLIENT_RMQ_MSG_PRIORITY; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; @@ -52,7 +53,7 @@ &message.device_id, BasicPublishOptions::default(), message.payload.as_bytes(), - BasicProperties::default(), + BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), ) .await .map_err(handle_amqp_error)?; diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -233,7 +233,7 @@ db_client: DatabaseClient, amqp_channel: lapin::Channel, ) -> Result, session::SessionError> { - let mut session = session::WebsocketSession::from_frame( + let session = session::WebsocketSession::from_frame( outgoing, db_client.clone(), frame, @@ -245,7 +245,5 @@ SessionError::InvalidMessage })?; - session::consume_error(session.deliver_persisted_messages().await); - Ok(session) } diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -1,3 +1,7 @@ +use crate::constants::{ + CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY, + RMQ_CONSUMER_TAG, +}; use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::operation::put_item::PutItemError; use derive_more; @@ -22,7 +26,6 @@ }; use crate::database::{self, DatabaseClient, DeviceMessage}; -use crate::error::Error; use crate::identity; pub struct DeviceInfo { @@ -55,16 +58,10 @@ PersistenceError(SdkError), } -pub fn consume_error(result: Result) { - if let Err(e) = result { - error!("{}", e) - } -} - // Parse a session request and retrieve the device information pub async fn handle_first_message_from_device( message: &str, -) -> Result { +) -> Result { let serialized_message = serde_json::from_str::(message)?; match serialized_message { @@ -89,11 +86,11 @@ match auth_request { Err(e) => { error!("Failed to complete request to identity service: {:?}", e); - return Err(SessionError::InternalError.into()); + return Err(SessionError::InternalError); } Ok(false) => { info!("Device failed authentication: {}", &session_info.device_id); - return Err(SessionError::UnauthorizedDevice.into()); + return Err(SessionError::UnauthorizedDevice); } Ok(true) => { debug!( @@ -107,42 +104,82 @@ } _ => { debug!("Received invalid request"); - Err(SessionError::InvalidMessage.into()) + Err(SessionError::InvalidMessage) } } } +async fn publish_persisted_messages( + db_client: &DatabaseClient, + amqp_channel: &lapin::Channel, + device_info: &DeviceInfo, +) -> Result<(), SessionError> { + let messages = db_client + .retrieve_messages(&device_info.device_id) + .await + .unwrap_or_else(|e| { + error!("Error while retrieving messages: {}", e); + Vec::new() + }); + + for message in messages { + let device_message = DeviceMessage::from_hashmap(message)?; + + amqp_channel + .basic_publish( + "", + &device_message.device_id, + BasicPublishOptions::default(), + device_message.payload.as_bytes(), + BasicProperties::default().with_priority(DDB_RMQ_MSG_PRIORITY), + ) + .await?; + + if let Err(e) = db_client + .delete_message(&device_info.device_id, &device_message.message_id) + .await + { + error!("Failed to delete message: {}:", e); + } + } + + debug!("Flushed messages for device: {}", &device_info.device_id); + Ok(()) +} + impl WebsocketSession { pub async fn from_frame( tx: SplitSink, Message>, db_client: DatabaseClient, frame: Message, amqp_channel: &lapin::Channel, - ) -> Result, Error> { + ) -> Result, SessionError> { let device_info = match frame { Message::Text(payload) => { handle_first_message_from_device(&payload).await? } _ => { error!("Client sent wrong frame type for establishing connection"); - return Err(SessionError::InvalidMessage.into()); + return Err(SessionError::InvalidMessage); } }; - // We don't currently have a use case to interact directly with the queue, - // however, we need to declare a queue for a given device + let mut args = FieldTable::default(); + args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into()); amqp_channel .queue_declare( &device_info.device_id, QueueDeclareOptions::default(), - FieldTable::default(), + args, ) .await?; + publish_persisted_messages(&db_client, amqp_channel, &device_info).await?; + let amqp_consumer = amqp_channel .basic_consume( &device_info.device_id, - "tunnelbroker", + RMQ_CONSUMER_TAG, BasicConsumeOptions::default(), FieldTable::default(), ) @@ -177,7 +214,7 @@ &message_request.device_id, BasicPublishOptions::default(), message_request.payload.as_bytes(), - BasicProperties::default(), + BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), ) .await; @@ -223,41 +260,6 @@ self.amqp_consumer.next().await } - pub async fn deliver_persisted_messages( - &mut self, - ) -> Result<(), SessionError> { - // Check for persisted messages - let messages = self - .db_client - .retrieve_messages(&self.device_info.device_id) - .await - .unwrap_or_else(|e| { - error!("Error while retrieving messages: {}", e); - Vec::new() - }); - - for message in messages { - let device_message = DeviceMessage::from_hashmap(message)?; - self - .send_message_to_device(Message::Text(device_message.payload)) - .await; - if let Err(e) = self - .db_client - .delete_message(&self.device_info.device_id, &device_message.message_id) - .await - { - error!("Failed to delete message: {}:", e); - } - } - - debug!( - "Flushed messages for device: {}", - &self.device_info.device_id - ); - - Ok(()) - } - pub async fn send_message_to_device(&mut self, message: Message) { if let Err(e) = self.tx.send(message).await { error!("Failed to send message to device: {}", e);