diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs index 6626c3397..2c73793b3 100644 --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -1,95 +1,95 @@ mod proto { tonic::include_proto!("tunnelbroker"); } use lapin::{options::BasicPublishOptions, BasicProperties}; use proto::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; use proto::Empty; use tonic::transport::Server; use tracing::debug; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; struct TunnelbrokerGRPC { client: DatabaseClient, amqp_channel: lapin::Channel, } // By setting mandatory to true, we don't wait for a confirmation for an eventual // delivery, instead we get an immediate undelivered error const PUBLISH_OPTIONS: BasicPublishOptions = BasicPublishOptions { immediate: false, mandatory: true, }; pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status { match error { lapin::Error::SerialisationError(_) | lapin::Error::ParsingError(_) => { tonic::Status::invalid_argument("Invalid argument") } _ => tonic::Status::internal("Internal Error"), } } #[tonic::async_trait] impl TunnelbrokerService for TunnelbrokerGRPC { async fn send_message_to_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Received message for {}", &message.device_id); if let Ok(confirmation) = self .amqp_channel .basic_publish( "", &message.device_id, PUBLISH_OPTIONS, &message.payload.as_bytes(), BasicProperties::default(), ) .await { debug!("Forwarded message: {:?}", &message); confirmation.await.map_err(handle_amqp_error)?; } else { self .client .persist_message(&message.device_id, &message.payload) .await .map_err(handle_ddb_error)?; } let response = tonic::Response::new(Empty {}); Ok(response) } } pub async fn run_server( client: DatabaseClient, ampq_connection: &lapin::Connection, ) -> Result<(), tonic::transport::Error> { - let addr = format!("[::1]:{}", CONFIG.grpc_port) + let addr = format!("[::]:{}", CONFIG.grpc_port) .parse() .expect("Unable to parse gRPC address"); let amqp_channel = ampq_connection .create_channel() .await .expect("Unable to create amqp channel"); - tracing::info!("Websocket server listening on {}", &addr); + tracing::info!("gRPC server listening on {}", &addr); Server::builder() .http2_keepalive_interval(Some(constants::GRPC_KEEP_ALIVE_PING_INTERVAL)) .http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT)) .add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC { client, amqp_channel, })) .serve(addr) .await } diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs index e3f65b502..88cbcd63f 100644 --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,130 +1,130 @@ mod session; use crate::database::DatabaseClient; use crate::websockets::session::SessionError; use crate::CONFIG; use futures_util::stream::SplitSink; use futures_util::StreamExt; use std::net::SocketAddr; use std::{env, io::Error}; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::WebSocketStream; use tracing::{debug, error, info}; use self::session::WebsocketSession; pub async fn run_server( db_client: DatabaseClient, amqp_connection: &lapin::Connection, ) -> Result<(), Error> { let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") - .unwrap_or_else(|_| format!("127.0.0.1:{}", &CONFIG.http_port)); + .unwrap_or_else(|_| format!("0.0.0.0:{}", &CONFIG.http_port)); let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); - info!("Listening on: {}", addr); + info!("WebSocket listening on: {}", addr); while let Ok((stream, addr)) = listener.accept().await { let channel = amqp_connection .create_channel() .await .expect("Unable to create amqp channel"); tokio::spawn(accept_connection(stream, addr, db_client.clone(), channel)); } Ok(()) } /// Handler for any incoming websocket connections async fn accept_connection( raw_stream: TcpStream, addr: SocketAddr, db_client: DatabaseClient, amqp_channel: lapin::Channel, ) { debug!("Incoming connection from: {}", addr); let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { Ok(stream) => stream, Err(e) => { info!( "Failed to establish connection with {}. Reason: {}", addr, e ); return; } }; let (outgoing, mut incoming) = ws_stream.split(); // We don't know the identity of the device until it sends the session // request over the websocket connection let mut session = if let Some(Ok(first_msg)) = incoming.next().await { match initiate_session(outgoing, first_msg, db_client, amqp_channel).await { Ok(session) => session, Err(_) => { error!("Failed to create session with device"); return; } } } else { error!("Failed to create session with device"); return; }; // Poll for messages either being sent to the device (rx) // or messages being received from the device (incoming) loop { debug!("Polling for messages from: {}", addr); tokio::select! { Some(Ok(delivery)) = session.next_amqp_message() => { if let Ok(message) = std::str::from_utf8(&delivery.data) { session.send_message_to_device(message.to_string()).await; } else { error!("Invalid payload"); } }, device_message = incoming.next() => { match device_message { Some(Ok(msg)) => { session::consume_error(session.handle_websocket_frame_from_device(msg).await); } _ => { debug!("Connection to {} closed remotely.", addr); break; } } }, else => { debug!("Unhealthy connection for: {}", addr); break; }, } } info!("Unregistering connection to: {}", addr); session.close().await } async fn initiate_session( outgoing: SplitSink, Message>, frame: Message, db_client: DatabaseClient, amqp_channel: lapin::Channel, ) -> Result { let mut session = session::WebsocketSession::from_frame( outgoing, db_client.clone(), frame, &amqp_channel, ) .await .map_err(|_| { error!("Device failed to send valid connection request."); SessionError::InvalidMessage })?; session::consume_error(session.deliver_persisted_messages().await); Ok(session) }