diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -1,6 +1,7 @@ use comm_lib::database::batch_operations::ExponentialBackoffConfig; use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; +use std::hash::Hasher; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -104,6 +105,26 @@ }) } + fn get_channel( + &self, + id_hash: impl std::hash::Hash, + ) -> Result { + // We have channel pool and want to distribute them between connected + // devices. Round robin would work too, but by using "hash modulo N" + // we make sure the same device will always use the same channel. + // Generally this shouldn't matter, but helps avoiding potential issues + // with the same queue name being declared by different channels, + // in case of reconnection. + let mut hasher = std::hash::DefaultHasher::new(); + id_hash.hash(&mut hasher); + let channel_idx: usize = hasher.finish() as usize % NUM_AMQP_CHANNELS; + + let channel = self.channels[channel_idx].clone(); + let channel_id = channel.id(); + tracing::trace!(channel_id, channel_idx, "Retrieving AMQP Channel"); + Ok(channel) + } + fn is_connected(&self) -> bool { self.conn.status().connected() }