diff --git a/services/identity/src/websockets/mod.rs b/services/identity/src/websockets/mod.rs --- a/services/identity/src/websockets/mod.rs +++ b/services/identity/src/websockets/mod.rs @@ -5,21 +5,20 @@ use elastic::client::responses::SearchResponse; use futures::lock::Mutex; -use futures_util::stream::SplitSink; use futures_util::{SinkExt, StreamExt}; -use hyper::upgrade::Upgraded; use hyper::{Body, Request, Response, StatusCode}; use hyper_tungstenite::tungstenite::Message; use hyper_tungstenite::HyperWebsocket; -use hyper_tungstenite::WebSocketStream; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use tracing::{debug, error, info}; mod auth; +mod send; use crate::config::CONFIG; use crate::constants::IDENTITY_SERVICE_WEBSOCKET_ADDR; +use send::{send_error_response, WebsocketSink}; pub mod errors; @@ -138,38 +137,7 @@ .await } -async fn send_error_response( - error: errors::WebsocketError, - outgoing: Arc, Message>>>, -) { - let response_msg = serde_json::json!({ - "action": "errorMessage", - "error": format!("{}", error) - }); - - match serde_json::to_string(&response_msg) { - Ok(serialized_response) => { - if let Err(send_error) = outgoing - .lock() - .await - .send(Message::Text(serialized_response)) - .await - { - error!("Failed to send error response: {:?}", send_error); - } - } - Err(serialize_error) => { - error!( - "Failed to serialize the error response: {:?}", - serialize_error - ); - } - } -} - -async fn close_connection( - outgoing: Arc, Message>>>, -) { +async fn close_connection(outgoing: WebsocketSink) { if let Err(e) = outgoing.lock().await.close().await { error!("Error closing connection: {}", e); } diff --git a/services/identity/src/websockets/send.rs b/services/identity/src/websockets/send.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/websockets/send.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use futures::lock::Mutex; +use futures_util::stream::SplitSink; +use futures_util::SinkExt; +use hyper::upgrade::Upgraded; +use hyper_tungstenite::tungstenite::Message; +use hyper_tungstenite::WebSocketStream; +use tracing::error; + +use crate::websockets::errors; + +pub type WebsocketSink = + Arc, Message>>>; + +pub async fn send_error_response( + error: errors::WebsocketError, + outgoing: Arc, Message>>>, +) { + let response_msg = serde_json::json!({ + "action": "errorMessage", + "error": format!("{}", error) + }); + + match serde_json::to_string(&response_msg) { + Ok(serialized_response) => { + send_message(Message::Text(serialized_response), outgoing).await; + } + Err(serialize_error) => { + error!( + "Failed to serialize the error response: {:?}", + serialize_error + ); + } + } +} + +pub async fn send_message( + message: Message, + outgoing: Arc, Message>>>, +) { + if let Err(e) = outgoing.lock().await.send(message).await { + error!("Failed to send message to device: {}", e); + } +}