Page MenuHomePhabricator

D13621.id44941.diff
No OneTemporary

D13621.id44941.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -3,7 +3,7 @@
use once_cell::sync::Lazy;
use std::sync::Arc;
use std::time::Duration;
-use tokio::sync::RwLock;
+use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
use crate::constants::error_types;
@@ -127,6 +127,40 @@
}
}
+/// Wrapper over [`lapin::Channel`] that automatically recreates AMQP channel
+/// in case of errors. The channel is initialized on first use.
+///
+/// TODO: Add support for restoring channel topology (queues and consumers)
+/// (`lapin` has this built-in, but it's internal crate feature)
+pub struct AmqpChannel {
+ conn: AmqpConnection,
+ channel: Arc<Mutex<Option<lapin::Channel>>>,
+}
+
+impl AmqpChannel {
+ pub fn new(amqp_connection: &AmqpConnection) -> Self {
+ let channel = Arc::new(Mutex::new(None));
+ Self {
+ conn: amqp_connection.clone(),
+ channel,
+ }
+ }
+
+ pub async fn get(&self) -> Result<lapin::Channel, lapin::Error> {
+ let mut channel = self.channel.lock().await;
+ match channel.as_ref() {
+ Some(ch) if ch.status().connected() => Ok(ch.clone()),
+ _ => {
+ let new_channel = self.conn.new_channel().await?;
+ let channel_id = new_channel.id();
+ debug!(channel_id, "Instantiated lazy AMQP channel.");
+ *channel = Some(new_channel.clone());
+ Ok(new_channel)
+ }
+ }
+ }
+}
+
fn should_ignore_error(err: &lapin::Error) -> bool {
use lapin::Error as E;
use std::io::ErrorKind;
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -11,14 +11,14 @@
use tracing::debug;
use tunnelbroker_messages::MessageToDevice;
-use crate::amqp::AmqpConnection;
+use crate::amqp::{AmqpChannel, AmqpConnection};
use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG};
use crate::database::{handle_ddb_error, DatabaseClient};
use crate::{constants, CONFIG};
struct TunnelbrokerGRPC {
client: DatabaseClient,
- amqp: AmqpConnection,
+ amqp_channel: AmqpChannel,
}
pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status {
@@ -58,8 +58,8 @@
.map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?;
self
- .amqp
- .new_channel()
+ .amqp_channel
+ .get()
.await
.map_err(handle_amqp_error)?
.basic_publish(
@@ -85,8 +85,8 @@
debug!("Connection close request for device {}", &message.device_id);
self
- .amqp
- .new_channel()
+ .amqp_channel
+ .get()
.await
.map_err(handle_amqp_error)?
.basic_publish(
@@ -141,7 +141,7 @@
.http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT))
.add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC {
client,
- amqp: amqp_connection.clone(),
+ amqp_channel: AmqpChannel::new(amqp_connection),
}))
.serve(addr)
.await

File Metadata

Mime Type
text/plain
Expires
Wed, Oct 9, 7:21 PM (21 h, 49 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2263037
Default Alt Text
D13621.id44941.diff (3 KB)

Event Timeline