Page MenuHomePhabricator

D10847.id36195.diff
No OneTemporary

D10847.id36195.diff

diff --git a/services/identity/src/websockets/mod.rs b/services/identity/src/websockets/mod.rs
--- a/services/identity/src/websockets/mod.rs
+++ b/services/identity/src/websockets/mod.rs
@@ -5,21 +5,20 @@
use elastic::client::responses::SearchResponse;
use futures::lock::Mutex;
-use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
-use hyper::upgrade::Upgraded;
use hyper::{Body, Request, Response, StatusCode};
use hyper_tungstenite::tungstenite::Message;
use hyper_tungstenite::HyperWebsocket;
-use hyper_tungstenite::WebSocketStream;
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tracing::{debug, error, info};
mod auth;
+mod send;
use crate::config::CONFIG;
use crate::constants::IDENTITY_SERVICE_WEBSOCKET_ADDR;
+use send::{send_error_response, WebsocketSink};
pub mod errors;
@@ -139,38 +138,7 @@
.await
}
-async fn send_error_response(
- error: errors::WebsocketError,
- outgoing: Arc<Mutex<SplitSink<WebSocketStream<Upgraded>, Message>>>,
-) {
- let response_msg = serde_json::json!({
- "action": "errorMessage",
- "error": format!("{}", error)
- });
-
- match serde_json::to_string(&response_msg) {
- Ok(serialized_response) => {
- if let Err(send_error) = outgoing
- .lock()
- .await
- .send(Message::Text(serialized_response))
- .await
- {
- error!("Failed to send error response: {:?}", send_error);
- }
- }
- Err(serialize_error) => {
- error!(
- "Failed to serialize the error response: {:?}",
- serialize_error
- );
- }
- }
-}
-
-async fn close_connection(
- outgoing: Arc<Mutex<SplitSink<WebSocketStream<Upgraded>, Message>>>,
-) {
+async fn close_connection(outgoing: WebsocketSink) {
if let Err(e) = outgoing.lock().await.close().await {
error!("Error closing connection: {}", e);
}
diff --git a/services/identity/src/websockets/send.rs b/services/identity/src/websockets/send.rs
new file mode 100644
--- /dev/null
+++ b/services/identity/src/websockets/send.rs
@@ -0,0 +1,45 @@
+use std::sync::Arc;
+
+use futures::lock::Mutex;
+use futures_util::stream::SplitSink;
+use futures_util::SinkExt;
+use hyper::upgrade::Upgraded;
+use hyper_tungstenite::tungstenite::Message;
+use hyper_tungstenite::WebSocketStream;
+use tracing::error;
+
+use crate::websockets::errors;
+
+pub type WebsocketSink =
+ Arc<Mutex<SplitSink<WebSocketStream<Upgraded>, Message>>>;
+
+pub async fn send_error_response(
+ error: errors::WebsocketError,
+ outgoing: Arc<Mutex<SplitSink<WebSocketStream<Upgraded>, Message>>>,
+) {
+ let response_msg = serde_json::json!({
+ "action": "errorMessage",
+ "error": format!("{}", error)
+ });
+
+ match serde_json::to_string(&response_msg) {
+ Ok(serialized_response) => {
+ send_message(Message::Text(serialized_response), outgoing).await;
+ }
+ Err(serialize_error) => {
+ error!(
+ "Failed to serialize the error response: {:?}",
+ serialize_error
+ );
+ }
+ }
+}
+
+pub async fn send_message(
+ message: Message,
+ outgoing: Arc<Mutex<SplitSink<WebSocketStream<Upgraded>, Message>>>,
+) {
+ if let Err(e) = outgoing.lock().await.send(message).await {
+ error!("Failed to send message to device: {}", e);
+ }
+}

File Metadata

Mime Type
text/plain
Expires
Tue, Oct 8, 4:28 AM (21 h, 49 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2258693
Default Alt Text
D10847.id36195.diff (3 KB)

Event Timeline