diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -330,7 +330,9 @@ session.close().await } -async fn initiate_session( +async fn initiate_session< + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +>( outgoing: SplitSink, Message>, frame: Message, db_client: DatabaseClient, diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -7,10 +7,12 @@ use futures_util::SinkExt; use hyper_tungstenite::{tungstenite::Message, WebSocketStream}; use lapin::message::Delivery; +use std::sync::Arc; use reqwest::Url; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::sync::Mutex; use tracing::{debug, error, info, trace}; use crate::amqp_client::AmqpClient; @@ -39,7 +41,7 @@ } pub struct WebsocketSession { - tx: SplitSink, Message>, + tx: Arc, Message>>>, db_client: DatabaseClient, pub device_info: DeviceInfo, // Each websocket has an AMQP connection associated with a particular device @@ -160,7 +162,7 @@ Ok(device_info) } -impl WebsocketSession { +impl WebsocketSession { pub async fn new( tx: SplitSink, Message>, db_client: DatabaseClient, @@ -177,7 +179,7 @@ }; Ok(Self { - tx, + tx: Arc::new(Mutex::new(tx)), db_client, device_info, amqp_client, @@ -260,8 +262,34 @@ match serialized_message { DeviceToTunnelbrokerMessage::FarcasterAPIRequest(request) => { - let response = self.handle_websocket_farcaster_request(request).await; - serde_json::to_string(&response).ok() + // Spawn Farcaster API request as a background task for parallel processing + let tx = self.tx.clone(); + let farcaster_client = self.farcaster_client.clone(); + let device_info = self.device_info.clone(); + + tokio::spawn(async move { + let response = Self::handle_websocket_farcaster_request_static( + &farcaster_client, + &device_info, + request, + ) + .await; + + if let Ok(response_json) = serde_json::to_string(&response) { + let mut tx_guard = tx.lock().await; + if let Err(e) = tx_guard.send(Message::Text(response_json)).await { + if !should_ignore_error(&e) { + error!( + errorType = error_types::WEBSOCKET_ERROR, + "Failed to send Farcaster API response to device: {}", e + ); + } + } + } + }); + + // Return immediately without waiting for the API response + None } message_from_device => { let message_status = @@ -411,16 +439,17 @@ } } - async fn handle_websocket_farcaster_request( - &mut self, + async fn handle_websocket_farcaster_request_static( + farcaster_client: &FarcasterClient, + device_info: &DeviceInfo, request: FarcasterAPIRequest, ) -> FarcasterAPIResponse { let request_id = request.request_id.clone(); - if !self.device_info.is_authenticated { + if !device_info.is_authenticated { debug!( "Unauthenticated device {} tried to call Farcaster API. Aborting.", - self.device_info.device_id + device_info.device_id ); return FarcasterAPIResponse { request_id, @@ -433,13 +462,13 @@ request.method, tunnelbroker_messages::farcaster::APIMethod::STREAM ) { - let response = - match self.farcaster_client.handle_stream_request(request).await { - Ok(()) => FarcasterAPIResponseData::Success( - "Message published to stream".to_string(), - ), - Err(err) => FarcasterAPIResponseData::Error(err.to_string()), - }; + let response = match farcaster_client.handle_stream_request(request).await + { + Ok(()) => FarcasterAPIResponseData::Success( + "Message published to stream".to_string(), + ), + Err(err) => FarcasterAPIResponseData::Error(err.to_string()), + }; return FarcasterAPIResponse { request_id, @@ -447,7 +476,7 @@ }; } - let response = match self.farcaster_client.api_request(request).await { + let response = match farcaster_client.api_request(request).await { Ok((status, response)) => { if status.is_success() { FarcasterAPIResponseData::Success(response) @@ -480,7 +509,8 @@ } pub async fn send_message_to_device(&mut self, message: Message) { - if let Err(e) = self.tx.send(message).await { + let mut tx = self.tx.lock().await; + if let Err(e) = tx.send(message).await { if should_ignore_error(&e) { debug!("Ignored error when sending message to device: {e:?}"); return; @@ -494,7 +524,8 @@ // Release WebSocket and remove from active connections pub async fn close(&mut self) { - if let Err(e) = self.tx.close().await { + let mut tx = self.tx.lock().await; + if let Err(e) = tx.close().await { debug!("Failed to close WebSocket session: {}", e); }