Changeset View
Changeset View
Standalone View
Standalone View
services/tunnelbroker/src/websockets/session.rs
use tracing::debug; | use tracing::debug; | ||||
use tunnelbroker_messages::Messages; | use tunnelbroker_messages::Messages; | ||||
use crate::ACTIVE_CONNECTIONS; | use crate::{ | ||||
constants::dynamodb::undelivered_messages::CREATED_AT, | |||||
database::DatabaseClient, ACTIVE_CONNECTIONS, | |||||
}; | |||||
pub struct WebsocketSession { | pub struct WebsocketSession { | ||||
tx: tokio::sync::mpsc::UnboundedSender<std::string::String>, | tx: tokio::sync::mpsc::UnboundedSender<std::string::String>, | ||||
db_client: DatabaseClient, | |||||
} | } | ||||
impl WebsocketSession { | impl WebsocketSession { | ||||
pub fn new( | pub fn new( | ||||
tx: tokio::sync::mpsc::UnboundedSender<std::string::String>, | tx: tokio::sync::mpsc::UnboundedSender<std::string::String>, | ||||
db_client: DatabaseClient, | |||||
) -> WebsocketSession { | ) -> WebsocketSession { | ||||
WebsocketSession { tx } | WebsocketSession { tx, db_client } | ||||
} | } | ||||
pub fn handle_message_from_device( | pub async fn handle_message_from_device( | ||||
&self, | &self, | ||||
message: &str, | message: &str, | ||||
) -> Result<(), serde_json::Error> { | ) -> Result<(), serde_json::Error> { | ||||
match serde_json::from_str::<Messages>(message)? { | match serde_json::from_str::<Messages>(message)? { | ||||
Messages::SessionRequest(session_info) => { | Messages::SessionRequest(session_info) => { | ||||
ACTIVE_CONNECTIONS.insert(session_info.device_id, self.tx.clone()); | // TODO: Authenticate device using auth token | ||||
// Check for persisted messages | |||||
let messages = self | |||||
.db_client | |||||
.retreive_messages(&session_info.device_id) | |||||
.await | |||||
.expect("Failed to retreive messages"); | |||||
ACTIVE_CONNECTIONS | |||||
.insert(session_info.device_id.clone(), self.tx.clone()); | |||||
for message in messages { | |||||
let payload = | |||||
message.get("payload").unwrap().as_s().unwrap().to_string(); | |||||
self | |||||
.tx | |||||
.send(payload) | |||||
.expect("Failed to send message to client"); | |||||
let created_at = | |||||
message.get(CREATED_AT).unwrap().as_n().unwrap().to_string(); | |||||
self | |||||
.db_client | |||||
.delete_message(&session_info.device_id, &created_at) | |||||
.await | |||||
.expect("Failed to delete messages"); | |||||
} | |||||
debug!("Flushed messages for device: {}", &session_info.device_id); | |||||
} | } | ||||
_ => { | _ => { | ||||
debug!("Received invalid request"); | debug!("Received invalid request"); | ||||
} | } | ||||
} | } | ||||
Ok(()) | Ok(()) | ||||
} | } | ||||
} | } |