diff --git a/services/tunnelbroker/src/amqp_client/amqp.rs b/services/tunnelbroker/src/amqp_client/amqp.rs --- a/services/tunnelbroker/src/amqp_client/amqp.rs +++ b/services/tunnelbroker/src/amqp_client/amqp.rs @@ -1,13 +1,18 @@ +use crate::constants::{error_types, CLIENT_RMQ_MSG_PRIORITY}; +use crate::database::DatabaseClient; +use crate::CONFIG; +use comm_lib::aws::ddb::error::SdkError; +use comm_lib::aws::ddb::operation::put_item::PutItemError; use comm_lib::database::batch_operations::ExponentialBackoffConfig; +use lapin::{options::BasicPublishOptions, BasicProperties}; use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; - -use crate::constants::error_types; -use crate::CONFIG; +use tunnelbroker_messages::MessageToDevice; +use uuid; static AMQP_URI: Lazy = Lazy::new(|| { let mut amqp_uri = CONFIG @@ -199,3 +204,48 @@ fn from_env(var_name: &str) -> Option { std::env::var(var_name).ok().filter(|s| !s.is_empty()) } + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +pub enum SendMessageError { + PersistenceError(SdkError), + SerializationError(serde_json::Error), + AmqpError(lapin::Error), +} +pub async fn send_message_to_device( + database_client: &DatabaseClient, + amqp_channel: &AmqpChannel, + device_id: String, + payload: String, +) -> Result<(), SendMessageError> { + debug!("Received message for {}", &device_id); + + let client_message_id = uuid::Uuid::new_v4().to_string(); + + let message_id = database_client + .persist_message(&device_id, &payload, &client_message_id) + .await?; + + let message_to_device = MessageToDevice { + device_id: device_id.clone(), + payload, + message_id, + }; + + let serialized_message = serde_json::to_string(&message_to_device)?; + + amqp_channel + .get() + .await? + .basic_publish( + "", + &device_id, + BasicPublishOptions::default(), + serialized_message.as_bytes(), + BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), + ) + .await?; + + Ok(()) +} diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -2,19 +2,20 @@ tonic::include_proto!("tunnelbroker"); } -use lapin::{options::BasicPublishOptions, BasicProperties}; +use crate::amqp_client::amqp::{ + send_message_to_device, AmqpChannel, AmqpConnection, SendMessageError, +}; +use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; +use crate::database::{handle_ddb_error, DatabaseClient}; +use crate::{constants, CONFIG}; +use lapin::options::BasicPublishOptions; +use lapin::BasicProperties; use proto::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; use proto::Empty; use tonic::transport::Server; use tracing::debug; -use tunnelbroker_messages::MessageToDevice; - -use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection}; -use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; -use crate::database::{handle_ddb_error, DatabaseClient}; -use crate::{constants, CONFIG}; struct TunnelbrokerGRPC { client: DatabaseClient, @@ -38,39 +39,24 @@ ) -> Result, tonic::Status> { let message = request.into_inner(); - debug!("Received message for {}", &message.device_id); - - let client_message_id = uuid::Uuid::new_v4().to_string(); - - let message_id = self - .client - .persist_message(&message.device_id, &message.payload, &client_message_id) - .await - .map_err(handle_ddb_error)?; - - let message_to_device = MessageToDevice { - device_id: message.device_id.clone(), - payload: message.payload, - message_id, - }; - - let serialized_message = serde_json::to_string(&message_to_device) - .map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?; - - self - .amqp_channel - .get() - .await - .map_err(handle_amqp_error)? - .basic_publish( - "", - &message.device_id, - BasicPublishOptions::default(), - serialized_message.as_bytes(), - BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), - ) - .await - .map_err(handle_amqp_error)?; + send_message_to_device( + &self.client, + &self.amqp_channel, + message.device_id, + message.payload, + ) + .await + .map_err(|e| match e { + SendMessageError::PersistenceError(ddb_error) => { + handle_ddb_error(ddb_error) + } + SendMessageError::SerializationError(_) => { + tonic::Status::invalid_argument("Invalid argument") + } + SendMessageError::AmqpError(lapin_error) => { + handle_amqp_error(lapin_error) + } + })?; let response = tonic::Response::new(Empty {}); Ok(response) diff --git a/services/tunnelbroker/src/token_distributor/error.rs b/services/tunnelbroker/src/token_distributor/error.rs --- a/services/tunnelbroker/src/token_distributor/error.rs +++ b/services/tunnelbroker/src/token_distributor/error.rs @@ -22,6 +22,8 @@ Cancelled, #[display(fmt = "AMQP setup failed: {}", _0)] AmqpSetupFailed(String), + #[display(fmt = "Message handling failed: {}", _0)] + MessageHandlingFailed(String), } impl std::error::Error for TokenConnectionError { diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -1,16 +1,22 @@ -use crate::amqp_client::amqp::AmqpConnection; +use crate::amqp_client::amqp::{ + send_message_to_device, AmqpChannel, AmqpConnection, +}; use crate::database::DatabaseClient; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; use futures_util::{SinkExt, StreamExt}; use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; +use grpc_clients::identity::protos::auth::PeersDeviceListsRequest; use lapin::{options::*, types::FieldTable, ExchangeKind}; use std::time::Duration; use tokio::time::{interval, Instant}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; -use tunnelbroker_messages::farcaster::{FarcasterMessage, FarcasterPayload}; +use tunnelbroker_messages::farcaster::{ + FarcasterMessage, FarcasterPayload, NewFarcasterMessage, + RefreshDirectCastConversationPayload, +}; pub(crate) struct TokenConnection { db: DatabaseClient, @@ -19,6 +25,7 @@ token_data: String, amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, + amqp_channel: AmqpChannel, } impl TokenConnection { @@ -36,8 +43,9 @@ config: config.clone(), user_id: user_id.clone(), token_data, - amqp_connection, + amqp_connection: amqp_connection.clone(), grpc_client, + amqp_channel: AmqpChannel::new(&amqp_connection), }; tokio::spawn(async move { @@ -58,6 +66,9 @@ TokenConnectionError::HeartbeatFailed(_) => "HeartbeatFailed", TokenConnectionError::Cancelled => "Cancelled", TokenConnectionError::AmqpSetupFailed(_) => "AmqpSetupFailed", + TokenConnectionError::MessageHandlingFailed(_) => { + "MessageHandlingFailed" + } }; info!( @@ -220,6 +231,8 @@ self.config.ping_timeout.as_secs() ); + let mut client = self.grpc_client.clone(); + loop { tokio::select! { // Handle AMQP messages and forward to WebSocket @@ -252,12 +265,25 @@ match msg { Some(Ok(msg)) => match msg { Message::Text(text) => { - info!("Received message for {}: {}", self.user_id, text); + debug!("Received message for {}: {}", self.user_id, text); match serde_json::from_str::(&text) { Ok(farcaster_msg) => { match &farcaster_msg.payload { FarcasterPayload::RefreshDirectCastConversation { payload, .. } => { - debug!("Processing refresh-direct-cast-conversation message"); + if let Err(e) = self.handle_refresh_conversation( + payload, + &mut client, + ).await { + info!( + metricType = "TokenDistributor_ConnectionFailure", + metricValue = 1, + instanceId = self.config.instance_id, + userId = self.user_id, + errorType = "MessageHandlingFailed", + "Failed to handle refresh direct cast conversation: {:?}", + e + ); + } } FarcasterPayload::RefreshSelfDirectCastsInbox { payload, .. } => { debug!("Processing refresh-self-direct-casts-inbox message"); @@ -423,4 +449,71 @@ ); Ok(consumer) } + + async fn handle_refresh_conversation( + &self, + payload: &RefreshDirectCastConversationPayload, + client: &mut ChainedInterceptedServicesAuthClient, + ) -> Result<(), TokenConnectionError> { + debug!( + "Handling refresh direct cast conversation for user: {}", + self.user_id + ); + + let conversation_id = &payload.conversation_id; + let direct_cast_message = &payload.message; + + debug!( + "Processing refresh for conversation ID: {}", + conversation_id + ); + + let request = PeersDeviceListsRequest { + user_ids: vec![self.user_id.clone()], + }; + let user_devices_response = client + .get_device_lists_for_users(request) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to get user devices: {}", + e + )) + })? + .into_inner(); + + let user_devices_option = user_devices_response + .users_devices_platform_details + .get(&self.user_id); + let message = NewFarcasterMessage { + message: direct_cast_message.clone(), + }; + if let Some(user_devices) = user_devices_option { + for (device_id, platform_details) in + &user_devices.devices_platform_details + { + let payload = serde_json::to_string(&message).map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to serialize: {}", + e + )) + })?; + send_message_to_device( + &self.db, + &self.amqp_channel, + device_id.to_string(), + payload, + ) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to send a message: {}", + e + )) + })?; + } + } + + Ok(()) + } }