Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33105167
D9571.1768476150.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
7 KB
Referenced Files
None
Subscribers
None
D9571.1768476150.diff
View Options
diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -5,6 +5,11 @@
pub const GRPC_KEEP_ALIVE_PING_INTERVAL: Duration = Duration::from_secs(3);
pub const GRPC_KEEP_ALIVE_PING_TIMEOUT: Duration = Duration::from_secs(10);
+pub const MAX_RMQ_MSG_PRIORITY: u8 = 10;
+pub const DDB_RMQ_MSG_PRIORITY: u8 = 10;
+pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1;
+pub const RMQ_CONSUMER_TAG: &str = "tunnelbroker";
+
pub const LOG_LEVEL_ENV_VAR: &str =
tracing_subscriber::filter::EnvFilter::DEFAULT_ENV;
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -10,6 +10,7 @@
use tonic::transport::Server;
use tracing::debug;
+use crate::constants::CLIENT_RMQ_MSG_PRIORITY;
use crate::database::{handle_ddb_error, DatabaseClient};
use crate::{constants, CONFIG};
@@ -52,7 +53,7 @@
&message.device_id,
BasicPublishOptions::default(),
message.payload.as_bytes(),
- BasicProperties::default(),
+ BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
)
.await
.map_err(handle_amqp_error)?;
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -233,7 +233,7 @@
db_client: DatabaseClient,
amqp_channel: lapin::Channel,
) -> Result<WebsocketSession<S>, session::SessionError> {
- let mut session = session::WebsocketSession::from_frame(
+ let session = session::WebsocketSession::from_frame(
outgoing,
db_client.clone(),
frame,
@@ -245,7 +245,5 @@
SessionError::InvalidMessage
})?;
- session::consume_error(session.deliver_persisted_messages().await);
-
Ok(session)
}
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -1,3 +1,7 @@
+use crate::constants::{
+ CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY,
+ RMQ_CONSUMER_TAG,
+};
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::operation::put_item::PutItemError;
use derive_more;
@@ -22,7 +26,6 @@
};
use crate::database::{self, DatabaseClient, DeviceMessage};
-use crate::error::Error;
use crate::identity;
pub struct DeviceInfo {
@@ -55,16 +58,10 @@
PersistenceError(SdkError<PutItemError>),
}
-pub fn consume_error<T>(result: Result<T, SessionError>) {
- if let Err(e) = result {
- error!("{}", e)
- }
-}
-
// Parse a session request and retrieve the device information
pub async fn handle_first_message_from_device(
message: &str,
-) -> Result<DeviceInfo, Error> {
+) -> Result<DeviceInfo, SessionError> {
let serialized_message = serde_json::from_str::<Messages>(message)?;
match serialized_message {
@@ -89,11 +86,11 @@
match auth_request {
Err(e) => {
error!("Failed to complete request to identity service: {:?}", e);
- return Err(SessionError::InternalError.into());
+ return Err(SessionError::InternalError);
}
Ok(false) => {
info!("Device failed authentication: {}", &session_info.device_id);
- return Err(SessionError::UnauthorizedDevice.into());
+ return Err(SessionError::UnauthorizedDevice);
}
Ok(true) => {
debug!(
@@ -107,42 +104,82 @@
}
_ => {
debug!("Received invalid request");
- Err(SessionError::InvalidMessage.into())
+ Err(SessionError::InvalidMessage)
}
}
}
+async fn publish_persisted_messages(
+ db_client: &DatabaseClient,
+ amqp_channel: &lapin::Channel,
+ device_info: &DeviceInfo,
+) -> Result<(), SessionError> {
+ let messages = db_client
+ .retrieve_messages(&device_info.device_id)
+ .await
+ .unwrap_or_else(|e| {
+ error!("Error while retrieving messages: {}", e);
+ Vec::new()
+ });
+
+ for message in messages {
+ let device_message = DeviceMessage::from_hashmap(message)?;
+
+ amqp_channel
+ .basic_publish(
+ "",
+ &device_message.device_id,
+ BasicPublishOptions::default(),
+ device_message.payload.as_bytes(),
+ BasicProperties::default().with_priority(DDB_RMQ_MSG_PRIORITY),
+ )
+ .await?;
+
+ if let Err(e) = db_client
+ .delete_message(&device_info.device_id, &device_message.message_id)
+ .await
+ {
+ error!("Failed to delete message: {}:", e);
+ }
+ }
+
+ debug!("Flushed messages for device: {}", &device_info.device_id);
+ Ok(())
+}
+
impl<S: AsyncRead + AsyncWrite + Unpin> WebsocketSession<S> {
pub async fn from_frame(
tx: SplitSink<WebSocketStream<S>, Message>,
db_client: DatabaseClient,
frame: Message,
amqp_channel: &lapin::Channel,
- ) -> Result<WebsocketSession<S>, Error> {
+ ) -> Result<WebsocketSession<S>, SessionError> {
let device_info = match frame {
Message::Text(payload) => {
handle_first_message_from_device(&payload).await?
}
_ => {
error!("Client sent wrong frame type for establishing connection");
- return Err(SessionError::InvalidMessage.into());
+ return Err(SessionError::InvalidMessage);
}
};
- // We don't currently have a use case to interact directly with the queue,
- // however, we need to declare a queue for a given device
+ let mut args = FieldTable::default();
+ args.insert("x-max-priority".into(), MAX_RMQ_MSG_PRIORITY.into());
amqp_channel
.queue_declare(
&device_info.device_id,
QueueDeclareOptions::default(),
- FieldTable::default(),
+ args,
)
.await?;
+ publish_persisted_messages(&db_client, amqp_channel, &device_info).await?;
+
let amqp_consumer = amqp_channel
.basic_consume(
&device_info.device_id,
- "tunnelbroker",
+ RMQ_CONSUMER_TAG,
BasicConsumeOptions::default(),
FieldTable::default(),
)
@@ -177,7 +214,7 @@
&message_request.device_id,
BasicPublishOptions::default(),
message_request.payload.as_bytes(),
- BasicProperties::default(),
+ BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
)
.await;
@@ -223,41 +260,6 @@
self.amqp_consumer.next().await
}
- pub async fn deliver_persisted_messages(
- &mut self,
- ) -> Result<(), SessionError> {
- // Check for persisted messages
- let messages = self
- .db_client
- .retrieve_messages(&self.device_info.device_id)
- .await
- .unwrap_or_else(|e| {
- error!("Error while retrieving messages: {}", e);
- Vec::new()
- });
-
- for message in messages {
- let device_message = DeviceMessage::from_hashmap(message)?;
- self
- .send_message_to_device(Message::Text(device_message.payload))
- .await;
- if let Err(e) = self
- .db_client
- .delete_message(&self.device_info.device_id, &device_message.message_id)
- .await
- {
- error!("Failed to delete message: {}:", e);
- }
- }
-
- debug!(
- "Flushed messages for device: {}",
- &self.device_info.device_id
- );
-
- Ok(())
- }
-
pub async fn send_message_to_device(&mut self, message: Message) {
if let Err(e) = self.tx.send(message).await {
error!("Failed to send message to device: {}", e);
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 15, 11:22 AM (7 h, 6 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5937518
Default Alt Text
D9571.1768476150.diff (7 KB)
Attached To
Mode
D9571: [Tunnelbroker] publish messages from DDB to RabbitMQ when client connects
Attached
Detach File
Event Timeline
Log In to Comment