diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -102,6 +102,7 @@ &amqp_connection, grpc_client, &auth_service, + farcaster_client.clone(), ); tokio::select! { diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -1,9 +1,11 @@ mod config; mod error; +mod notif_utils; mod token_connection; use crate::constants::error_types; use crate::database::DatabaseClient; +use crate::farcaster::FarcasterClient; pub(crate) use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::token_connection::TokenConnection; use crate::{amqp_client::amqp::AmqpConnection, log::redact_sensitive_data}; @@ -23,6 +25,7 @@ amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, auth_service: AuthService, + farcaster_client: FarcasterClient, } impl TokenDistributor { @@ -32,6 +35,7 @@ amqp_connection: &AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, auth_service: &AuthService, + farcaster_client: FarcasterClient, ) -> Self { info!( "Initializing TokenDistributor - max_connections: {}, \ @@ -61,6 +65,7 @@ amqp_connection: amqp_connection.clone(), grpc_client, auth_service: auth_service.clone(), + farcaster_client, } } @@ -202,6 +207,7 @@ cancel_token.clone(), self.grpc_client.clone(), &self.auth_service, + self.farcaster_client.clone(), ); // Store the cancellation token diff --git a/services/tunnelbroker/src/token_distributor/notif_utils.rs b/services/tunnelbroker/src/token_distributor/notif_utils.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/token_distributor/notif_utils.rs @@ -0,0 +1,71 @@ +use tunnelbroker_messages::farcaster::{ + DirectCastConversation, DirectCastMessageType, + RefreshDirectCastConversationPayload, +}; + +use crate::notifs::GenericNotifPayload; + +pub fn prepare_notif_payload( + payload: &RefreshDirectCastConversationPayload, + conversation: &DirectCastConversation, + recipient_fid: Option<&String>, +) -> Option { + let RefreshDirectCastConversationPayload { + conversation_id, + message, + .. + } = payload; + + if conversation.muted { + // TODO: badge only? + return None; + } + if message.message_type != DirectCastMessageType::Text { + return None; + } + + // Don't send a notif from self + if recipient_fid.is_some_and(|fid| *fid == message.sender_fid.to_string()) { + return None; + } + + let message_metadata = message.extra.get("metadata"); + let has_photos = + message_metadata.is_some_and(|metadata| metadata["medias"].is_array()); + let has_videos = + message_metadata.is_some_and(|metadata| metadata["videos"].is_array()); + + let title = conversation + .name + .as_deref() + .or_else(|| { + conversation + .participant(message.sender_fid) + .map(|u| u.display_name.as_str()) + }) + .unwrap_or("Farcaster"); + + let body = if has_photos { + "[Photo message]" + } else if has_videos { + "[Video message]" + } else { + message.message.as_str() + }; + + Some(GenericNotifPayload { + title: trim_text(title, 100), + body: trim_text(body, 300), + thread_id: format!("FARCASTER#{}", conversation_id), + }) +} + +fn trim_text(text: &str, max_length: usize) -> String { + if text.len() <= max_length { + return text.to_string(); + } else if max_length <= 3 { + return text[0..max_length].to_string(); + } + let substr = text[0..(max_length - 3)].to_string(); + format!("{}...", substr) +} diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -3,9 +3,12 @@ use crate::config::CONFIG; use crate::constants::MEDIA_MIRROR_TIMEOUT; use crate::database::DatabaseClient; +use crate::farcaster::FarcasterClient; use crate::log::redact_sensitive_data; +use crate::notifs::{GenericNotifClient, NotifRecipientDescriptor}; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; +use crate::token_distributor::notif_utils::prepare_notif_payload; use comm_lib::auth::AuthService; use comm_lib::blob::client::S2SAuthedBlobClient; use comm_lib::blob::types::http::MirroredMediaInfo; @@ -32,6 +35,8 @@ grpc_client: ChainedInterceptedServicesAuthClient, message_sender: BasicMessageSender, blob_client: S2SAuthedBlobClient, + farcaster_client: FarcasterClient, + notif_client: GenericNotifClient, } impl TokenConnection { @@ -44,6 +49,7 @@ cancellation_token: CancellationToken, grpc_client: ChainedInterceptedServicesAuthClient, auth_service: &AuthService, + farcaster_client: FarcasterClient, ) { let message_sender = BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection)); @@ -59,6 +65,8 @@ auth_service, CONFIG.blob_service_url.clone(), ), + farcaster_client, + notif_client: GenericNotifClient::new(db.clone(), message_sender.clone()), message_sender, }; @@ -502,6 +510,18 @@ ); let conversation_id = &payload.conversation_id; + let conversation = self + .farcaster_client + .fetch_conversation(&self.user_id, conversation_id) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to fetch conversation details: {e:?}", + )) + })?; + let current_user_fid = extract_fid_from_dcs_token(&self.token_data); + let notif = + prepare_notif_payload(payload, &conversation, current_user_fid.as_ref()); debug!( "Processing refresh for conversation ID: {}", @@ -538,21 +558,45 @@ })? .into_inner(); - let user_devices_option = user_devices_response + let Some(user_devices) = user_devices_response .users_devices_platform_details - .get(&self.user_id); + .get(&self.user_id) + else { + return Ok(()); + }; + let message = NewFarcasterMessage { message: direct_cast_message, }; - if let Some(user_devices) = user_devices_option { - let message_payload = serde_json::to_string(&message).map_err(|e| { - TokenConnectionError::MessageHandlingFailed(format!( - "Failed to serialize: {}", - e - )) - })?; + let message_payload = serde_json::to_string(&message).map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to serialize: {}", + e + )) + })?; + + for (device_id, platform_details) in &user_devices.devices_platform_details + { + let notif_future = async || { + let Some(notif_payload) = ¬if else { + return; + }; + let target = NotifRecipientDescriptor { + platform: platform_details.device_type().into(), + device_id: device_id.to_string(), + }; + if let Err(err) = self + .notif_client + .send_notif(notif_payload.clone(), target) + .await + { + if !err.is_invalid_token() { + tracing::error!("Failed to send Farcaster notif: {:?}", err); + } + } + }; - for device_id in user_devices.devices_platform_details.keys() { + let message_future = async || { self .message_sender .simple_send_message_to_device(device_id, message_payload.clone()) @@ -562,8 +606,11 @@ "Failed to send a message: {}", e )) - })?; - } + }) + }; + + let (message_result, _) = tokio::join!(message_future(), notif_future()); + message_result?; } Ok(()) @@ -632,3 +679,21 @@ Some(found_medias) } + +/// Farcaster DCs token string has specific format, from which we can extract +/// authenticated user's FID. +/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue` +fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option { + const PREFIX: &str = "fc_dc_"; + const SEP: &str = "_"; + + let stripped = fc_dcs_token.strip_prefix(PREFIX)?; + let (fid, _rest) = stripped.split_once(SEP)?; + + let is_valid = fid.chars().all(|c: char| c.is_ascii_digit()); + if !is_valid { + return None; + } + + Some(fid.to_string()) +}