Page MenuHomePhabricator

D9594.id32390.diff
No OneTemporary

D9594.id32390.diff

diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -1,7 +1,7 @@
pub mod session;
use crate::database::DatabaseClient;
-use crate::websockets::session::SessionError;
+use crate::websockets::session::initialize_amqp;
use crate::CONFIG;
use futures_util::stream::SplitSink;
use futures_util::StreamExt;
@@ -235,18 +235,24 @@
frame: Message,
db_client: DatabaseClient,
amqp_channel: lapin::Channel,
-) -> Result<WebsocketSession<S>, session::SessionError> {
- let session = session::WebsocketSession::from_frame(
- outgoing,
- db_client.clone(),
- frame,
- &amqp_channel,
- )
- .await
- .map_err(|_| {
- error!("Device failed to send valid connection request.");
- SessionError::InvalidMessage
- })?;
-
- Ok(session)
+) -> Result<
+ WebsocketSession<S>,
+ (
+ session::SessionError,
+ SplitSink<WebSocketStream<S>, Message>,
+ ),
+> {
+ let initialized_session =
+ initialize_amqp(db_client.clone(), frame, &amqp_channel).await;
+
+ match initialized_session {
+ Ok((device_info, amqp_consumer)) => Ok(WebsocketSession::new(
+ outgoing,
+ db_client,
+ device_info,
+ amqp_channel,
+ amqp_consumer,
+ )),
+ Err(e) => Err((e, outgoing)),
+ }
}
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -142,51 +142,54 @@
Ok(())
}
+pub async fn initialize_amqp(
+ db_client: DatabaseClient,
+ frame: Message,
+ amqp_channel: &lapin::Channel,
+) -> Result<(DeviceInfo, lapin::Consumer), SessionError> {
+ let device_info = match frame {
+ Message::Text(payload) => {
+ handle_first_message_from_device(&payload).await?
+ }
+ _ => {
+ error!("Client sent wrong frame type for establishing connection");
+ return Err(SessionError::InvalidMessage);
+ }
+ };
+
+ let mut args = FieldTable::default();
+ args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into());
+ amqp_channel
+ .queue_declare(&device_info.device_id, QueueDeclareOptions::default(), args)
+ .await?;
+
+ publish_persisted_messages(&db_client, amqp_channel, &device_info).await?;
+
+ let amqp_consumer = amqp_channel
+ .basic_consume(
+ &device_info.device_id,
+ RMQ_CONSUMER_TAG,
+ BasicConsumeOptions::default(),
+ FieldTable::default(),
+ )
+ .await?;
+ Ok((device_info, amqp_consumer))
+}
impl<S: AsyncRead + AsyncWrite + Unpin> WebsocketSession<S> {
- pub async fn from_frame(
+ pub fn new(
tx: SplitSink<WebSocketStream<S>, Message>,
db_client: DatabaseClient,
- frame: Message,
- amqp_channel: &lapin::Channel,
- ) -> Result<WebsocketSession<S>, SessionError> {
- let device_info = match frame {
- Message::Text(payload) => {
- handle_first_message_from_device(&payload).await?
- }
- _ => {
- error!("Client sent wrong frame type for establishing connection");
- return Err(SessionError::InvalidMessage);
- }
- };
-
- let mut args = FieldTable::default();
- args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into());
- amqp_channel
- .queue_declare(
- &device_info.device_id,
- QueueDeclareOptions::default(),
- args,
- )
- .await?;
-
- publish_persisted_messages(&db_client, amqp_channel, &device_info).await?;
-
- let amqp_consumer = amqp_channel
- .basic_consume(
- &device_info.device_id,
- RMQ_CONSUMER_TAG,
- BasicConsumeOptions::default(),
- FieldTable::default(),
- )
- .await?;
-
- Ok(WebsocketSession {
+ device_info: DeviceInfo,
+ amqp_channel: lapin::Channel,
+ amqp_consumer: lapin::Consumer,
+ ) -> Self {
+ Self {
tx,
db_client,
device_info,
- amqp_channel: amqp_channel.clone(),
+ amqp_channel,
amqp_consumer,
- })
+ }
}
pub async fn handle_message_to_device(

File Metadata

Mime Type
text/plain
Expires
Sun, Oct 6, 3:31 AM (22 h, 3 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2247548
Default Alt Text
D9594.id32390.diff (4 KB)

Event Timeline