Changeset View
Changeset View
Standalone View
Standalone View
services/tunnelbroker/src/websockets/session.rs
use derive_more; | use derive_more; | ||||
use futures_util::stream::SplitSink; | use futures_util::stream::SplitSink; | ||||
use futures_util::SinkExt; | use futures_util::SinkExt; | ||||
use tokio::{net::TcpStream, sync::mpsc::UnboundedSender}; | use tokio::{net::TcpStream, sync::mpsc::UnboundedSender}; | ||||
use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; | use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; | ||||
use tracing::{debug, error}; | use tracing::{debug, error}; | ||||
use tunnelbroker_messages::{session::DeviceTypes, Messages}; | use tunnelbroker_messages::{session::DeviceTypes, Messages}; | ||||
use crate::{ | use crate::{ | ||||
constants::dynamodb::undelivered_messages::CREATED_AT, | database::{self, DatabaseClient, DeviceMessage}, | ||||
database::DatabaseClient, ACTIVE_CONNECTIONS, | ACTIVE_CONNECTIONS, | ||||
}; | }; | ||||
pub struct DeviceInfo { | pub struct DeviceInfo { | ||||
pub device_id: String, | pub device_id: String, | ||||
pub notify_token: Option<String>, | pub notify_token: Option<String>, | ||||
pub device_type: DeviceTypes, | pub device_type: DeviceTypes, | ||||
pub device_app_version: Option<String>, | pub device_app_version: Option<String>, | ||||
pub device_os: Option<String>, | pub device_os: Option<String>, | ||||
} | } | ||||
pub struct WebsocketSession { | pub struct WebsocketSession { | ||||
tx: SplitSink<WebSocketStream<TcpStream>, Message>, | tx: SplitSink<WebSocketStream<TcpStream>, Message>, | ||||
db_client: DatabaseClient, | db_client: DatabaseClient, | ||||
device_info: Option<DeviceInfo>, | device_info: Option<DeviceInfo>, | ||||
} | } | ||||
#[derive(Debug, derive_more::Display, derive_more::From)] | #[derive(Debug, derive_more::Display, derive_more::From)] | ||||
pub enum SessionError { | pub enum SessionError { | ||||
InvalidMessage, | InvalidMessage, | ||||
SerializationError(serde_json::Error), | SerializationError(serde_json::Error), | ||||
MessageError(database::MessageErrors), | |||||
} | } | ||||
fn consume_error<T>(result: Result<T, SessionError>) { | fn consume_error<T>(result: Result<T, SessionError>) { | ||||
if let Err(e) = result { | if let Err(e) = result { | ||||
error!("{}", e) | error!("{}", e) | ||||
} | } | ||||
} | } | ||||
impl WebsocketSession { | impl WebsocketSession { | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | match serialized_message { | ||||
.unwrap_or_else(|e| { | .unwrap_or_else(|e| { | ||||
error!("Error while retrieving messages: {}", e); | error!("Error while retrieving messages: {}", e); | ||||
Vec::new() | Vec::new() | ||||
}); | }); | ||||
ACTIVE_CONNECTIONS.insert(device_info.device_id.clone(), tx.clone()); | ACTIVE_CONNECTIONS.insert(device_info.device_id.clone(), tx.clone()); | ||||
for message in messages { | for message in messages { | ||||
let payload = | let device_message = DeviceMessage::from_hashmap(message)?; | ||||
message.get("payload").unwrap().as_s().unwrap().to_string(); | self.send_message_to_device(device_message.payload).await; | ||||
let created_at = | if let Err(e) = self | ||||
message.get(CREATED_AT).unwrap().as_n().unwrap().to_string(); | |||||
self.send_message_to_device(payload).await; | |||||
self | |||||
.db_client | .db_client | ||||
.delete_message(&device_info.device_id, &created_at) | .delete_message(&device_info.device_id, &device_message.created_at) | ||||
.await | .await | ||||
.expect("Failed to delete messages"); | { | ||||
error!("Failed to delete message: {}:", e); | |||||
} | |||||
} | } | ||||
debug!("Flushed messages for device: {}", &session_info.device_id); | debug!("Flushed messages for device: {}", &session_info.device_id); | ||||
self.device_info = Some(device_info); | self.device_info = Some(device_info); | ||||
} | } | ||||
_ => { | _ => { | ||||
debug!("Received invalid request"); | debug!("Received invalid request"); | ||||
Show All 23 Lines |