diff --git a/services/tunnelbroker/src/amqp_client/amqp.rs b/services/tunnelbroker/src/amqp_client/amqp.rs --- a/services/tunnelbroker/src/amqp_client/amqp.rs +++ b/services/tunnelbroker/src/amqp_client/amqp.rs @@ -1,12 +1,16 @@ use comm_lib::database::batch_operations::ExponentialBackoffConfig; +use lapin::{options::BasicPublishOptions, BasicProperties}; use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; +use tunnelbroker_messages::MessageToDevice; +use uuid; -use crate::constants::error_types; +use crate::constants::{error_types, CLIENT_RMQ_MSG_PRIORITY}; +use crate::database::DatabaseClient; use crate::CONFIG; static AMQP_URI: Lazy = Lazy::new(|| { @@ -199,3 +203,44 @@ fn from_env(var_name: &str) -> Option { std::env::var(var_name).ok().filter(|s| !s.is_empty()) } + +pub async fn send_message_to_device( + database_client: &DatabaseClient, + amqp_channel: &AmqpChannel, + device_id: String, + payload: String, +) -> Result<(), Box> { + debug!("Received message for {}", &device_id); + + let client_message_id = uuid::Uuid::new_v4().to_string(); + + let message_id = database_client + .persist_message(&device_id, &payload, &client_message_id) + .await + .map_err(|e| Box::new(e) as Box)?; + + let message_to_device = MessageToDevice { + device_id: device_id.clone(), + payload, + message_id, + }; + + let serialized_message = serde_json::to_string(&message_to_device) + .map_err(|e| Box::new(e) as Box)?; + + amqp_channel + .get() + .await + .map_err(|e| Box::new(e) as Box)? + .basic_publish( + "", + &device_id, + BasicPublishOptions::default(), + serialized_message.as_bytes(), + BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), + ) + .await + .map_err(|e| Box::new(e) as Box)?; + + Ok(()) +} diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -2,19 +2,20 @@ tonic::include_proto!("tunnelbroker"); } -use lapin::{options::BasicPublishOptions, BasicProperties}; +use crate::amqp_client::amqp::{ + send_message_to_device, AmqpChannel, AmqpConnection, +}; +use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; +use crate::database::DatabaseClient; +use crate::{constants, CONFIG}; +use lapin::options::BasicPublishOptions; +use lapin::BasicProperties; use proto::tunnelbroker_service_server::{ TunnelbrokerService, TunnelbrokerServiceServer, }; use proto::Empty; use tonic::transport::Server; use tracing::debug; -use tunnelbroker_messages::MessageToDevice; - -use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection}; -use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; -use crate::database::{handle_ddb_error, DatabaseClient}; -use crate::{constants, CONFIG}; struct TunnelbrokerGRPC { client: DatabaseClient, @@ -38,39 +39,14 @@ ) -> Result, tonic::Status> { let message = request.into_inner(); - debug!("Received message for {}", &message.device_id); - - let client_message_id = uuid::Uuid::new_v4().to_string(); - - let message_id = self - .client - .persist_message(&message.device_id, &message.payload, &client_message_id) - .await - .map_err(handle_ddb_error)?; - - let message_to_device = MessageToDevice { - device_id: message.device_id.clone(), - payload: message.payload, - message_id, - }; - - let serialized_message = serde_json::to_string(&message_to_device) - .map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?; - - self - .amqp_channel - .get() - .await - .map_err(handle_amqp_error)? - .basic_publish( - "", - &message.device_id, - BasicPublishOptions::default(), - serialized_message.as_bytes(), - BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), - ) - .await - .map_err(handle_amqp_error)?; + send_message_to_device( + &self.client, + &self.amqp_channel, + message.device_id, + message.payload, + ) + .await + .map_err(|_| tonic::Status::internal("Internal Error"))?; let response = tonic::Response::new(Empty {}); Ok(response) diff --git a/services/tunnelbroker/src/token_distributor/error.rs b/services/tunnelbroker/src/token_distributor/error.rs --- a/services/tunnelbroker/src/token_distributor/error.rs +++ b/services/tunnelbroker/src/token_distributor/error.rs @@ -22,6 +22,8 @@ Cancelled, #[display(fmt = "AMQP setup failed: {}", _0)] AmqpSetupFailed(String), + #[display(fmt = "Message parsing failed: {}", _0)] + MessageParsingFailed(String), } impl std::error::Error for TokenConnectionError { @@ -45,3 +47,15 @@ TokenConnectionError::DatabaseError(error) } } + +impl From for TokenConnectionError { + fn from(error: serde_json::Error) -> Self { + TokenConnectionError::MessageParsingFailed(error.to_string()) + } +} + +impl From> for TokenConnectionError { + fn from(error: Box) -> Self { + TokenConnectionError::MessageParsingFailed(error.to_string()) + } +} diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -1,16 +1,21 @@ -use crate::amqp_client::amqp::AmqpConnection; +use crate::amqp_client::amqp::{ + send_message_to_device, AmqpChannel, AmqpConnection, +}; use crate::database::DatabaseClient; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; use futures_util::{SinkExt, StreamExt}; use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; +use grpc_clients::identity::protos::authenticated::PeersDeviceListsRequest; use lapin::{options::*, types::FieldTable, ExchangeKind}; use std::time::Duration; use tokio::time::{interval, Instant}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; -use tunnelbroker_messages::farcaster::FarcasterMessage; +use tunnelbroker_messages::farcaster::{ + FarcasterMessage, NewFarcasterMessage, RefreshDirectCastConversationPayload, +}; pub(crate) struct TokenConnection { db: DatabaseClient, @@ -19,6 +24,7 @@ token_data: String, amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, + amqp_channel: AmqpChannel, } impl TokenConnection { @@ -36,8 +42,9 @@ config: config.clone(), user_id: user_id.clone(), token_data, - amqp_connection, + amqp_connection: amqp_connection.clone(), grpc_client, + amqp_channel: AmqpChannel::new(&amqp_connection), }; tokio::spawn(async move { @@ -58,6 +65,9 @@ TokenConnectionError::HeartbeatFailed(_) => "HeartbeatFailed", TokenConnectionError::Cancelled => "Cancelled", TokenConnectionError::AmqpSetupFailed(_) => "AmqpSetupFailed", + TokenConnectionError::MessageParsingFailed(_) => { + "MessageParsingFailed" + } }; info!( @@ -220,6 +230,8 @@ self.config.ping_timeout.as_secs() ); + let mut client = self.grpc_client.clone(); + loop { tokio::select! { // Handle AMQP messages and forward to WebSocket @@ -259,9 +271,11 @@ match farcaster_msg.message_type.as_str() { "refresh-direct-cast-conversation" => { - debug!("Processing refresh-direct-cast-conversation message"); - if let Some(payload) = &farcaster_msg.payload { - debug!("Conversation payload: {}", payload); + if let Err(e) = self.handle_refresh_conversation( + farcaster_msg.payload.as_ref(), + &mut client, + ).await { + error!("Failed to handle refresh direct cast conversation: {:?}", e); } } "refresh-self-direct-casts-inbox" => { @@ -437,4 +451,80 @@ ); Ok(consumer) } + + async fn handle_refresh_conversation( + &self, + payload: Option<&serde_json::Value>, + client: &mut ChainedInterceptedServicesAuthClient, + ) -> Result<(), TokenConnectionError> { + debug!( + "Handling refresh direct cast conversation for user: {}", + self.user_id + ); + + let (conversation_id, direct_cast_message) = match payload { + Some(payload_value) => { + match serde_json::from_value::( + payload_value.clone(), + ) { + Ok(parsed_payload) => { + (parsed_payload.conversation_id, parsed_payload.message) + } + Err(e) => { + warn!( + "Failed to parse RefreshDirectCastConversation payload: {}", + e + ); + return Err(TokenConnectionError::MessageParsingFailed(format!( + "Invalid payload: {}", + e + ))); + } + } + } + None => { + warn!("No payload provided for refresh direct cast conversation"); + return Err(TokenConnectionError::MessageParsingFailed( + "Missing payload".to_string(), + )); + } + }; + + debug!( + "Processing refresh for conversation ID: {}", + conversation_id + ); + + let request = PeersDeviceListsRequest { + user_ids: vec![self.user_id.clone()], + }; + let user_devices_response = client + .get_device_lists_for_users(request) + .await + .expect("GetDeviceListsForUser RPC failed") + .into_inner(); + + let user_devices_option = user_devices_response + .users_devices_platform_details + .get(&self.user_id); + let message = NewFarcasterMessage { + message: direct_cast_message, + }; + if let Some(user_devices) = user_devices_option { + for (device_id, platform_details) in + &user_devices.devices_platform_details + { + let payload = serde_json::to_string(&message)?; + send_message_to_device( + &self.db, + &self.amqp_channel, + device_id.to_string(), + payload, + ) + .await?; + } + } + + Ok(()) + } } diff --git a/shared/tunnelbroker_messages/src/messages/farcaster.rs b/shared/tunnelbroker_messages/src/messages/farcaster.rs --- a/shared/tunnelbroker_messages/src/messages/farcaster.rs +++ b/shared/tunnelbroker_messages/src/messages/farcaster.rs @@ -151,3 +151,9 @@ #[serde(flatten)] pub extra: serde_json::Map, } + +#[derive(Serialize, Deserialize, TagAwareDeserialize, PartialEq, Debug)] +#[serde(tag = "type", remote = "Self", rename_all = "camelCase")] +pub struct NewFarcasterMessage { + pub message: DirectCastMessage, +}