diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,7 +1,7 @@ pub mod session; use crate::database::DatabaseClient; -use crate::websockets::session::SessionError; +use crate::websockets::session::initialize_amqp; use crate::CONFIG; use futures_util::stream::SplitSink; use futures_util::StreamExt; @@ -235,18 +235,24 @@ frame: Message, db_client: DatabaseClient, amqp_channel: lapin::Channel, -) -> Result, session::SessionError> { - let session = session::WebsocketSession::from_frame( - outgoing, - db_client.clone(), - frame, - &amqp_channel, - ) - .await - .map_err(|_| { - error!("Device failed to send valid connection request."); - SessionError::InvalidMessage - })?; - - Ok(session) +) -> Result< + WebsocketSession, + ( + session::SessionError, + SplitSink, Message>, + ), +> { + let initialized_session = + initialize_amqp(db_client.clone(), frame, &amqp_channel).await; + + match initialized_session { + Ok((device_info, amqp_consumer)) => Ok(WebsocketSession::new( + outgoing, + db_client, + device_info, + amqp_channel, + amqp_consumer, + )), + Err(e) => Err((e, outgoing)), + } } diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -142,51 +142,54 @@ Ok(()) } +pub async fn initialize_amqp( + db_client: DatabaseClient, + frame: Message, + amqp_channel: &lapin::Channel, +) -> Result<(DeviceInfo, lapin::Consumer), SessionError> { + let device_info = match frame { + Message::Text(payload) => { + handle_first_message_from_device(&payload).await? + } + _ => { + error!("Client sent wrong frame type for establishing connection"); + return Err(SessionError::InvalidMessage); + } + }; + + let mut args = FieldTable::default(); + args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into()); + amqp_channel + .queue_declare(&device_info.device_id, QueueDeclareOptions::default(), args) + .await?; + + publish_persisted_messages(&db_client, amqp_channel, &device_info).await?; + + let amqp_consumer = amqp_channel + .basic_consume( + &device_info.device_id, + RMQ_CONSUMER_TAG, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + Ok((device_info, amqp_consumer)) +} impl WebsocketSession { - pub async fn from_frame( + pub fn new( tx: SplitSink, Message>, db_client: DatabaseClient, - frame: Message, - amqp_channel: &lapin::Channel, - ) -> Result, SessionError> { - let device_info = match frame { - Message::Text(payload) => { - handle_first_message_from_device(&payload).await? - } - _ => { - error!("Client sent wrong frame type for establishing connection"); - return Err(SessionError::InvalidMessage); - } - }; - - let mut args = FieldTable::default(); - args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into()); - amqp_channel - .queue_declare( - &device_info.device_id, - QueueDeclareOptions::default(), - args, - ) - .await?; - - publish_persisted_messages(&db_client, amqp_channel, &device_info).await?; - - let amqp_consumer = amqp_channel - .basic_consume( - &device_info.device_id, - RMQ_CONSUMER_TAG, - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await?; - - Ok(WebsocketSession { + device_info: DeviceInfo, + amqp_channel: lapin::Channel, + amqp_consumer: lapin::Consumer, + ) -> Self { + Self { tx, db_client, device_info, - amqp_channel: amqp_channel.clone(), + amqp_channel, amqp_consumer, - }) + } } pub async fn handle_message_to_device(