Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32168891
D15370.1765055326.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
10 KB
Referenced Files
None
Subscribers
None
D15370.1765055326.diff
View Options
diff --git a/services/tunnelbroker/src/amqp_client/amqp.rs b/services/tunnelbroker/src/amqp_client/amqp.rs
--- a/services/tunnelbroker/src/amqp_client/amqp.rs
+++ b/services/tunnelbroker/src/amqp_client/amqp.rs
@@ -1,18 +1,12 @@
-use crate::constants::{error_types, CLIENT_RMQ_MSG_PRIORITY};
-use crate::database::DatabaseClient;
+use crate::constants::error_types;
use crate::CONFIG;
-use comm_lib::aws::ddb::error::SdkError;
-use comm_lib::aws::ddb::operation::put_item::PutItemError;
use comm_lib::database::batch_operations::ExponentialBackoffConfig;
-use lapin::{options::BasicPublishOptions, BasicProperties};
use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
-use tunnelbroker_messages::MessageToDevice;
-use uuid;
static AMQP_URI: Lazy<AMQPUri> = Lazy::new(|| {
let mut amqp_uri = CONFIG
@@ -137,6 +131,7 @@
///
/// TODO: Add support for restoring channel topology (queues and consumers)
/// (`lapin` has this built-in, but it's internal crate feature)
+#[derive(Clone)]
pub struct AmqpChannel {
conn: AmqpConnection,
channel: Arc<Mutex<Option<lapin::Channel>>>,
@@ -204,48 +199,3 @@
fn from_env(var_name: &str) -> Option<String> {
std::env::var(var_name).ok().filter(|s| !s.is_empty())
}
-
-#[derive(
- Debug, derive_more::Display, derive_more::From, derive_more::Error,
-)]
-pub enum SendMessageError {
- PersistenceError(SdkError<PutItemError>),
- SerializationError(serde_json::Error),
- AmqpError(lapin::Error),
-}
-pub async fn send_message_to_device(
- database_client: &DatabaseClient,
- amqp_channel: &AmqpChannel,
- device_id: String,
- payload: String,
-) -> Result<(), SendMessageError> {
- debug!("Received message for {}", &device_id);
-
- let client_message_id = uuid::Uuid::new_v4().to_string();
-
- let message_id = database_client
- .persist_message(&device_id, &payload, &client_message_id)
- .await?;
-
- let message_to_device = MessageToDevice {
- device_id: device_id.clone(),
- payload,
- message_id,
- };
-
- let serialized_message = serde_json::to_string(&message_to_device)?;
-
- amqp_channel
- .get()
- .await?
- .basic_publish(
- "",
- &device_id,
- BasicPublishOptions::default(),
- serialized_message.as_bytes(),
- BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
- )
- .await?;
-
- Ok(())
-}
diff --git a/services/tunnelbroker/src/amqp_client/mod.rs b/services/tunnelbroker/src/amqp_client/mod.rs
--- a/services/tunnelbroker/src/amqp_client/mod.rs
+++ b/services/tunnelbroker/src/amqp_client/mod.rs
@@ -17,6 +17,7 @@
use tunnelbroker_messages::{MessageToDevice, MessageToDeviceRequest};
pub mod amqp;
+pub mod utils;
pub struct AmqpClient {
db_client: DatabaseClient,
diff --git a/services/tunnelbroker/src/amqp_client/utils.rs b/services/tunnelbroker/src/amqp_client/utils.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/amqp_client/utils.rs
@@ -0,0 +1,108 @@
+use comm_lib::aws::ddb::{error::SdkError, operation::put_item::PutItemError};
+use lapin::{options::BasicPublishOptions, BasicProperties};
+use tunnelbroker_messages::{MessageToDevice, MessageToDeviceRequest};
+
+use crate::{constants::CLIENT_RMQ_MSG_PRIORITY, database::DatabaseClient};
+
+use super::amqp::AmqpChannel;
+
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
+pub enum SendMessageError {
+ PersistenceError(SdkError<PutItemError>),
+ SerializationError(serde_json::Error),
+ AmqpError(lapin::Error),
+}
+
+#[derive(Clone)]
+pub struct BasicMessageSender {
+ db_client: DatabaseClient,
+ amqp_channel: AmqpChannel,
+}
+
+impl BasicMessageSender {
+ pub fn new(
+ database_client: &DatabaseClient,
+ amqp_channel: AmqpChannel,
+ ) -> Self {
+ Self {
+ db_client: database_client.clone(),
+ amqp_channel,
+ }
+ }
+
+ pub async fn send_message_to_device(
+ &self,
+ message_request: &MessageToDeviceRequest,
+ ) -> Result<(), SendMessageError> {
+ let MessageToDeviceRequest {
+ client_message_id,
+ device_id,
+ payload,
+ } = message_request;
+
+ send_message_to_device(
+ &self.db_client,
+ &self.amqp_channel,
+ device_id.clone(),
+ payload.clone(),
+ Some(client_message_id),
+ )
+ .await
+ }
+
+ pub async fn simple_send_message_to_device(
+ &self,
+ recipient_device_id: &str,
+ payload: String,
+ ) -> Result<(), SendMessageError> {
+ self
+ .send_message_to_device(&tunnelbroker_messages::MessageToDeviceRequest {
+ client_message_id: uuid::Uuid::new_v4().to_string(),
+ device_id: recipient_device_id.to_string(),
+ payload,
+ })
+ .await
+ }
+}
+
+pub async fn send_message_to_device(
+ database_client: &DatabaseClient,
+ amqp_channel: &AmqpChannel,
+ device_id: String,
+ payload: String,
+ client_message_id: Option<&String>,
+) -> Result<(), SendMessageError> {
+ tracing::debug!("Received message for {}", &device_id);
+
+ let client_message_id = client_message_id
+ .cloned()
+ .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
+
+ let message_id = database_client
+ .persist_message(&device_id, &payload, &client_message_id)
+ .await?;
+
+ let message_to_device = MessageToDevice {
+ device_id: device_id.clone(),
+ payload,
+ message_id,
+ };
+
+ let serialized_message = serde_json::to_string(&message_to_device)?;
+
+ amqp_channel
+ .get()
+ .await?
+ .basic_publish(
+ "",
+ &device_id,
+ BasicPublishOptions::default(),
+ serialized_message.as_bytes(),
+ BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
+ )
+ .await?;
+
+ Ok(())
+}
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -2,9 +2,8 @@
tonic::include_proto!("tunnelbroker");
}
-use crate::amqp_client::amqp::{
- send_message_to_device, AmqpChannel, AmqpConnection, SendMessageError,
-};
+use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection};
+use crate::amqp_client::utils::{send_message_to_device, SendMessageError};
use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG};
use crate::database::{handle_ddb_error, DatabaseClient};
use crate::{constants, CONFIG};
@@ -44,6 +43,7 @@
&self.amqp_channel,
message.device_id,
message.payload,
+ None,
)
.await
.map_err(|e| match e {
diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs
--- a/services/tunnelbroker/src/token_distributor/token_connection.rs
+++ b/services/tunnelbroker/src/token_distributor/token_connection.rs
@@ -1,6 +1,5 @@
-use crate::amqp_client::amqp::{
- send_message_to_device, AmqpChannel, AmqpConnection,
-};
+use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection};
+use crate::amqp_client::utils::BasicMessageSender;
use crate::config::CONFIG;
use crate::constants::MEDIA_MIRROR_TIMEOUT;
use crate::database::DatabaseClient;
@@ -13,6 +12,7 @@
use futures_util::{SinkExt, StreamExt};
use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient;
use grpc_clients::identity::protos::auth::PeersDeviceListsRequest;
+use grpc_clients::identity::DeviceType;
use lapin::{options::*, types::FieldTable, ExchangeKind};
use std::time::Duration;
use tokio::time::{interval, Instant};
@@ -31,7 +31,7 @@
token_data: String,
amqp_connection: AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
- amqp_channel: AmqpChannel,
+ message_sender: BasicMessageSender,
blob_client: S2SAuthedBlobClient,
}
@@ -46,6 +46,9 @@
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: &AuthService,
) {
+ let message_sender =
+ BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection));
+
let connection = Self {
db: db.clone(),
config: config.clone(),
@@ -53,11 +56,11 @@
token_data,
amqp_connection: amqp_connection.clone(),
grpc_client,
- amqp_channel: AmqpChannel::new(&amqp_connection),
blob_client: S2SAuthedBlobClient::new(
auth_service,
CONFIG.blob_service_url.clone(),
),
+ message_sender,
};
tokio::spawn(async move {
@@ -543,28 +546,35 @@
message: direct_cast_message,
};
if let Some(user_devices) = user_devices_option {
- for (device_id, platform_details) in
- &user_devices.devices_platform_details
- {
- let payload = serde_json::to_string(&message).map_err(|e| {
- TokenConnectionError::MessageHandlingFailed(format!(
- "Failed to serialize: {}",
- e
- ))
- })?;
- send_message_to_device(
- &self.db,
- &self.amqp_channel,
- device_id.to_string(),
- payload,
- )
- .await
- .map_err(|e| {
- TokenConnectionError::MessageHandlingFailed(format!(
- "Failed to send a message: {}",
- e
- ))
- })?;
+ let message_payload = serde_json::to_string(&message).map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to serialize: {}",
+ e
+ ))
+ })?;
+
+ // Filter out keyservers
+ let device_ids = user_devices.devices_platform_details.iter().filter_map(
+ |(device_id, platform_details)| {
+ if platform_details.device_type() == DeviceType::Keyserver {
+ None
+ } else {
+ Some(device_id)
+ }
+ },
+ );
+
+ for device_id in device_ids {
+ self
+ .message_sender
+ .simple_send_message_to_device(device_id, message_payload.clone())
+ .await
+ .map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to send a message: {}",
+ e
+ ))
+ })?;
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 6, 9:08 PM (19 h, 49 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5840027
Default Alt Text
D15370.1765055326.diff (10 KB)
Attached To
Mode
D15370: [tunnelbroker] Extract utility to send messages
Attached
Detach File
Event Timeline
Log In to Comment