Page MenuHomePhabricator

D13602.diff
No OneTemporary

D13602.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -1,6 +1,7 @@
use comm_lib::database::batch_operations::ExponentialBackoffConfig;
use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
+use std::hash::Hasher;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
@@ -104,6 +105,26 @@
})
}
+ fn get_channel(
+ &self,
+ id_hash: impl std::hash::Hash,
+ ) -> Result<lapin::Channel, lapin::Error> {
+ // We have channel pool and want to distribute them between connected
+ // devices. Round robin would work too, but by using "hash modulo N"
+ // we make sure the same device will always use the same channel.
+ // Generally this shouldn't matter, but helps avoiding potential issues
+ // with the same queue name being declared by different channels,
+ // in case of reconnection.
+ let mut hasher = std::hash::DefaultHasher::new();
+ id_hash.hash(&mut hasher);
+ let channel_idx: usize = hasher.finish() as usize % NUM_AMQP_CHANNELS;
+
+ let channel = self.channels[channel_idx].clone();
+ let channel_id = channel.id();
+ tracing::trace!(channel_id, channel_idx, "Retrieving AMQP Channel");
+ Ok(channel)
+ }
+
fn is_connected(&self) -> bool {
self.conn.status().connected()
}

File Metadata

Mime Type
text/plain
Expires
Sat, Oct 5, 9:22 PM (22 h, 4 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2240043
Default Alt Text
D13602.diff (1 KB)

Event Timeline