Page MenuHomePhorge

D9571.1768476150.diff
No OneTemporary

Size
7 KB
Referenced Files
None
Subscribers
None

D9571.1768476150.diff

diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -5,6 +5,11 @@
pub const GRPC_KEEP_ALIVE_PING_INTERVAL: Duration = Duration::from_secs(3);
pub const GRPC_KEEP_ALIVE_PING_TIMEOUT: Duration = Duration::from_secs(10);
+pub const MAX_RMQ_MSG_PRIORITY: u8 = 10;
+pub const DDB_RMQ_MSG_PRIORITY: u8 = 10;
+pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1;
+pub const RMQ_CONSUMER_TAG: &str = "tunnelbroker";
+
pub const LOG_LEVEL_ENV_VAR: &str =
tracing_subscriber::filter::EnvFilter::DEFAULT_ENV;
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -10,6 +10,7 @@
use tonic::transport::Server;
use tracing::debug;
+use crate::constants::CLIENT_RMQ_MSG_PRIORITY;
use crate::database::{handle_ddb_error, DatabaseClient};
use crate::{constants, CONFIG};
@@ -52,7 +53,7 @@
&message.device_id,
BasicPublishOptions::default(),
message.payload.as_bytes(),
- BasicProperties::default(),
+ BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
)
.await
.map_err(handle_amqp_error)?;
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -233,7 +233,7 @@
db_client: DatabaseClient,
amqp_channel: lapin::Channel,
) -> Result<WebsocketSession<S>, session::SessionError> {
- let mut session = session::WebsocketSession::from_frame(
+ let session = session::WebsocketSession::from_frame(
outgoing,
db_client.clone(),
frame,
@@ -245,7 +245,5 @@
SessionError::InvalidMessage
})?;
- session::consume_error(session.deliver_persisted_messages().await);
-
Ok(session)
}
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -1,3 +1,7 @@
+use crate::constants::{
+ CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY,
+ RMQ_CONSUMER_TAG,
+};
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::operation::put_item::PutItemError;
use derive_more;
@@ -22,7 +26,6 @@
};
use crate::database::{self, DatabaseClient, DeviceMessage};
-use crate::error::Error;
use crate::identity;
pub struct DeviceInfo {
@@ -55,16 +58,10 @@
PersistenceError(SdkError<PutItemError>),
}
-pub fn consume_error<T>(result: Result<T, SessionError>) {
- if let Err(e) = result {
- error!("{}", e)
- }
-}
-
// Parse a session request and retrieve the device information
pub async fn handle_first_message_from_device(
message: &str,
-) -> Result<DeviceInfo, Error> {
+) -> Result<DeviceInfo, SessionError> {
let serialized_message = serde_json::from_str::<Messages>(message)?;
match serialized_message {
@@ -89,11 +86,11 @@
match auth_request {
Err(e) => {
error!("Failed to complete request to identity service: {:?}", e);
- return Err(SessionError::InternalError.into());
+ return Err(SessionError::InternalError);
}
Ok(false) => {
info!("Device failed authentication: {}", &session_info.device_id);
- return Err(SessionError::UnauthorizedDevice.into());
+ return Err(SessionError::UnauthorizedDevice);
}
Ok(true) => {
debug!(
@@ -107,42 +104,82 @@
}
_ => {
debug!("Received invalid request");
- Err(SessionError::InvalidMessage.into())
+ Err(SessionError::InvalidMessage)
}
}
}
+async fn publish_persisted_messages(
+ db_client: &DatabaseClient,
+ amqp_channel: &lapin::Channel,
+ device_info: &DeviceInfo,
+) -> Result<(), SessionError> {
+ let messages = db_client
+ .retrieve_messages(&device_info.device_id)
+ .await
+ .unwrap_or_else(|e| {
+ error!("Error while retrieving messages: {}", e);
+ Vec::new()
+ });
+
+ for message in messages {
+ let device_message = DeviceMessage::from_hashmap(message)?;
+
+ amqp_channel
+ .basic_publish(
+ "",
+ &device_message.device_id,
+ BasicPublishOptions::default(),
+ device_message.payload.as_bytes(),
+ BasicProperties::default().with_priority(DDB_RMQ_MSG_PRIORITY),
+ )
+ .await?;
+
+ if let Err(e) = db_client
+ .delete_message(&device_info.device_id, &device_message.message_id)
+ .await
+ {
+ error!("Failed to delete message: {}:", e);
+ }
+ }
+
+ debug!("Flushed messages for device: {}", &device_info.device_id);
+ Ok(())
+}
+
impl<S: AsyncRead + AsyncWrite + Unpin> WebsocketSession<S> {
pub async fn from_frame(
tx: SplitSink<WebSocketStream<S>, Message>,
db_client: DatabaseClient,
frame: Message,
amqp_channel: &lapin::Channel,
- ) -> Result<WebsocketSession<S>, Error> {
+ ) -> Result<WebsocketSession<S>, SessionError> {
let device_info = match frame {
Message::Text(payload) => {
handle_first_message_from_device(&payload).await?
}
_ => {
error!("Client sent wrong frame type for establishing connection");
- return Err(SessionError::InvalidMessage.into());
+ return Err(SessionError::InvalidMessage);
}
};
- // We don't currently have a use case to interact directly with the queue,
- // however, we need to declare a queue for a given device
+ let mut args = FieldTable::default();
+ args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into());
amqp_channel
.queue_declare(
&device_info.device_id,
QueueDeclareOptions::default(),
- FieldTable::default(),
+ args,
)
.await?;
+ publish_persisted_messages(&db_client, amqp_channel, &device_info).await?;
+
let amqp_consumer = amqp_channel
.basic_consume(
&device_info.device_id,
- "tunnelbroker",
+ RMQ_CONSUMER_TAG,
BasicConsumeOptions::default(),
FieldTable::default(),
)
@@ -177,7 +214,7 @@
&message_request.device_id,
BasicPublishOptions::default(),
message_request.payload.as_bytes(),
- BasicProperties::default(),
+ BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
)
.await;
@@ -223,41 +260,6 @@
self.amqp_consumer.next().await
}
- pub async fn deliver_persisted_messages(
- &mut self,
- ) -> Result<(), SessionError> {
- // Check for persisted messages
- let messages = self
- .db_client
- .retrieve_messages(&self.device_info.device_id)
- .await
- .unwrap_or_else(|e| {
- error!("Error while retrieving messages: {}", e);
- Vec::new()
- });
-
- for message in messages {
- let device_message = DeviceMessage::from_hashmap(message)?;
- self
- .send_message_to_device(Message::Text(device_message.payload))
- .await;
- if let Err(e) = self
- .db_client
- .delete_message(&self.device_info.device_id, &device_message.message_id)
- .await
- {
- error!("Failed to delete message: {}:", e);
- }
- }
-
- debug!(
- "Flushed messages for device: {}",
- &self.device_info.device_id
- );
-
- Ok(())
- }
-
pub async fn send_message_to_device(&mut self, message: Message) {
if let Err(e) = self.tx.send(message).await {
error!("Failed to send message to device: {}", e);

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 15, 11:22 AM (7 h, 6 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5937518
Default Alt Text
D9571.1768476150.diff (7 KB)

Event Timeline