diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs index 1b947a3f0..38cca467b 100644 --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -1,160 +1,194 @@ use comm_lib::database::batch_operations::ExponentialBackoffConfig; use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; use crate::constants::error_types; use crate::CONFIG; static AMQP_URI: Lazy = Lazy::new(|| { let mut amqp_uri = CONFIG .amqp_uri .parse::() .expect("Invalid AMQP URI"); // Allow set / override credentials using env vars if let Some(amqp_user) = from_env("AMQP_USERNAME") { amqp_uri.authority.userinfo.username = amqp_user; } if let Some(amqp_pass) = from_env("AMQP_PASSWORD") { amqp_uri.authority.userinfo.password = amqp_pass; } amqp_uri }); async fn create_connection() -> Result { let options = ConnectionProperties::default() .with_executor(tokio_executor_trait::Tokio::current()) .with_reactor(tokio_reactor_trait::Tokio); let retry_config = ExponentialBackoffConfig { max_attempts: 5, base_duration: Duration::from_millis(500), ..Default::default() }; let mut retry_counter = retry_config.new_counter(); tracing::debug!("Attempting to connect to AMQP..."); loop { let amqp_uri = Lazy::force(&AMQP_URI).clone(); match lapin::Connection::connect_uri(amqp_uri, options.clone()).await { Ok(conn) => return Ok(conn), Err(err) => { let attempt = retry_counter.attempt(); tracing::warn!(attempt, "AMQP connection attempt failed: {err}."); if retry_counter.sleep_and_retry().await.is_err() { tracing::error!("Unable to connect to AMQP: {err}"); return Err(err); } } } } } /// Inner connection that is a direct wrapper over [`lapin::Connection`] /// This should be instantiated only once to establish connection /// New instances can be created to reconnect struct ConnectionInner { conn: lapin::Connection, } impl ConnectionInner { async fn new() -> Result { let conn = create_connection().await?; conn.on_error(|err| { if should_ignore_error(&err) { debug!("Ignored AMQP Lapin error: {err:?}"); return; } error!(errorType = error_types::AMQP_ERROR, "Lapin error: {err:?}"); }); Ok(Self { conn }) } fn is_connected(&self) -> bool { self.conn.status().connected() } } /// Thread safe connection wrapper that is `Clone + Send + Sync` /// and can be shared wherever needed. #[derive(Clone)] pub struct AmqpConnection { inner: Arc>, } impl AmqpConnection { pub async fn connect() -> Result { let conn = ConnectionInner::new().await?; let inner = Arc::new(RwLock::new(conn)); info!("Connected to AMQP endpoint: {}", &CONFIG.amqp_uri); Ok(Self { inner }) } pub async fn new_channel(&self) -> Result { if !self.is_connected().await { warn!("AMQP disconnected while retrieving channel"); self.reset_conn().await?; } self.inner.read().await.conn.create_channel().await } async fn reset_conn(&self) -> Result<(), lapin::Error> { let mut inner = self.inner.write().await; if !inner.is_connected() { debug!("Resetting AMQP connection..."); let new_conn = ConnectionInner::new().await?; *inner = new_conn; info!("AMQP Connection restored."); } Ok(()) } async fn is_connected(&self) -> bool { self.inner.read().await.is_connected() } pub fn maybe_reconnect_in_background(&self) { let this = self.clone(); tokio::spawn(async move { this.reset_conn().await }); } } +/// Wrapper over [`lapin::Channel`] that automatically recreates AMQP channel +/// in case of errors. The channel is initialized on first use. +/// +/// TODO: Add support for restoring channel topology (queues and consumers) +/// (`lapin` has this built-in, but it's internal crate feature) +pub struct AmqpChannel { + conn: AmqpConnection, + channel: Arc>>, +} + +impl AmqpChannel { + pub fn new(amqp_connection: &AmqpConnection) -> Self { + let channel = Arc::new(Mutex::new(None)); + Self { + conn: amqp_connection.clone(), + channel, + } + } + + pub async fn get(&self) -> Result { + let mut channel = self.channel.lock().await; + match channel.as_ref() { + Some(ch) if ch.status().connected() => Ok(ch.clone()), + _ => { + let new_channel = self.conn.new_channel().await?; + let channel_id = new_channel.id(); + debug!(channel_id, "Instantiated lazy AMQP channel."); + *channel = Some(new_channel.clone()); + Ok(new_channel) + } + } + } +} + fn should_ignore_error(err: &lapin::Error) -> bool { use lapin::Error as E; use std::io::ErrorKind; if is_connection_error(err) { return true; } if let E::IOError(io_error) = err { return match io_error.kind() { // Suppresses: "Socket was readable but we read 0."" // We handle this by auto-reconnecting ErrorKind::ConnectionAborted => true, _ => false, }; } false } pub fn is_connection_error(err: &lapin::Error) -> bool { matches!( err, lapin::Error::InvalidChannelState(_) | lapin::Error::InvalidConnectionState(_) ) } fn from_env(var_name: &str) -> Option { std::env::var(var_name).ok().filter(|s| !s.is_empty()) } diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs index d32a5b51b..147a00253 100644 --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -1,148 +1,148 @@ mod proto { tonic::include_proto!("tunnelbroker"); } use lapin::{options::BasicPublishOptions, BasicProperties}; use proto::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; use proto::Empty; use tonic::transport::Server; use tracing::debug; use tunnelbroker_messages::MessageToDevice; -use crate::amqp::AmqpConnection; +use crate::amqp::{AmqpChannel, AmqpConnection}; use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; struct TunnelbrokerGRPC { client: DatabaseClient, - amqp: AmqpConnection, + amqp_channel: AmqpChannel, } pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status { match error { lapin::Error::SerialisationError(_) | lapin::Error::ParsingError(_) => { tonic::Status::invalid_argument("Invalid argument") } _ => tonic::Status::internal("Internal Error"), } } #[tonic::async_trait] impl TunnelbrokerService for TunnelbrokerGRPC { async fn send_message_to_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Received message for {}", &message.device_id); let client_message_id = uuid::Uuid::new_v4().to_string(); let message_id = self .client .persist_message(&message.device_id, &message.payload, &client_message_id) .await .map_err(handle_ddb_error)?; let message_to_device = MessageToDevice { device_id: message.device_id.clone(), payload: message.payload, message_id, }; let serialized_message = serde_json::to_string(&message_to_device) .map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?; self - .amqp - .new_channel() + .amqp_channel + .get() .await .map_err(handle_amqp_error)? .basic_publish( "", &message.device_id, BasicPublishOptions::default(), serialized_message.as_bytes(), BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), ) .await .map_err(handle_amqp_error)?; let response = tonic::Response::new(Empty {}); Ok(response) } async fn force_close_device_connection( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Connection close request for device {}", &message.device_id); self - .amqp - .new_channel() + .amqp_channel + .get() .await .map_err(handle_amqp_error)? .basic_publish( "", &message.device_id, BasicPublishOptions::default(), WS_SESSION_CLOSE_AMQP_MSG.as_bytes(), BasicProperties::default() // Connection close request should have higher priority .with_priority(CLIENT_RMQ_MSG_PRIORITY + 1) // The message should expire quickly. If the device isn't connected // (there's no consumer), there's no point in keeping this message. .with_expiration("1000".into()), ) .await .map_err(handle_amqp_error)?; let response = tonic::Response::new(Empty {}); Ok(response) } async fn delete_device_data( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Deleting {} data", &message.device_id); self .client .remove_device_token(&message.device_id) .await .map_err(|_| tonic::Status::failed_precondition("unexpected error"))?; let response = tonic::Response::new(Empty {}); Ok(response) } } pub async fn run_server( client: DatabaseClient, amqp_connection: &AmqpConnection, ) -> Result<(), tonic::transport::Error> { let addr = format!("[::]:{}", CONFIG.grpc_port) .parse() .expect("Unable to parse gRPC address"); tracing::info!("gRPC server listening on {}", &addr); Server::builder() .http2_keepalive_interval(Some(constants::GRPC_KEEP_ALIVE_PING_INTERVAL)) .http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT)) .add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC { client, - amqp: amqp_connection.clone(), + amqp_channel: AmqpChannel::new(amqp_connection), })) .serve(addr) .await }