diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -102,6 +102,7 @@ &amqp_connection, grpc_client, &auth_service, + farcaster_client.clone(), ); tokio::select! { diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -1,9 +1,11 @@ mod config; mod error; +mod notif_utils; mod token_connection; use crate::constants::error_types; use crate::database::DatabaseClient; +use crate::farcaster::FarcasterClient; pub(crate) use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::token_connection::TokenConnection; use crate::{amqp_client::amqp::AmqpConnection, log::redact_sensitive_data}; @@ -23,6 +25,7 @@ amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, auth_service: AuthService, + farcaster_client: FarcasterClient, } impl TokenDistributor { @@ -32,6 +35,7 @@ amqp_connection: &AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, auth_service: &AuthService, + farcaster_client: FarcasterClient, ) -> Self { info!( "Initializing TokenDistributor - max_connections: {}, \ @@ -61,6 +65,7 @@ amqp_connection: amqp_connection.clone(), grpc_client, auth_service: auth_service.clone(), + farcaster_client, } } @@ -202,6 +207,7 @@ cancel_token.clone(), self.grpc_client.clone(), &self.auth_service, + self.farcaster_client.clone(), ); // Store the cancellation token diff --git a/services/tunnelbroker/src/token_distributor/notif_utils.rs b/services/tunnelbroker/src/token_distributor/notif_utils.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/token_distributor/notif_utils.rs @@ -0,0 +1,71 @@ +use tunnelbroker_messages::farcaster::{ + DirectCastConversation, DirectCastMessageType, + RefreshDirectCastConversationPayload, +}; + +use crate::notifs::GenericNotifPayload; + +pub fn prepare_notif_payload( + payload: &RefreshDirectCastConversationPayload, + conversation: &DirectCastConversation, + recipient_fid: Option<&String>, +) -> Option { + let RefreshDirectCastConversationPayload { + conversation_id, + message, + .. + } = payload; + + if conversation.muted { + // TODO: badge only? + return None; + } + if message.message_type != DirectCastMessageType::Text { + return None; + } + + // Don't send a notif from self + if recipient_fid.is_some_and(|fid| *fid == message.sender_fid.to_string()) { + return None; + } + + let message_metadata = message.extra.get("metadata"); + let has_photos = + message_metadata.is_some_and(|metadata| metadata["medias"].is_array()); + let has_videos = + message_metadata.is_some_and(|metadata| metadata["videos"].is_array()); + + let title = conversation + .name + .as_deref() + .or_else(|| { + conversation + .participant(message.sender_fid) + .map(|u| u.display_name.as_str()) + }) + .unwrap_or("Farcaster"); + + let body = if has_photos { + "[Photo message]" + } else if has_videos { + "[Video message]" + } else { + message.message.as_str() + }; + + Some(GenericNotifPayload { + title: trim_text(title, 100), + body: trim_text(body, 300), + thread_id: format!("FARCASTER#{}", conversation_id), + }) +} + +fn trim_text(text: &str, max_length: usize) -> String { + if text.len() <= max_length { + return text.to_string(); + } else if max_length <= 3 { + return text[0..max_length].to_string(); + } + let substr = text[0..(max_length - 3)].to_string(); + format!("{}...", substr) +} diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -3,15 +3,20 @@ use crate::config::CONFIG; use crate::constants::MEDIA_MIRROR_TIMEOUT; use crate::database::DatabaseClient; +use crate::farcaster::FarcasterClient; use crate::log::redact_sensitive_data; +use crate::notifs::{GenericNotifClient, NotifRecipientDescriptor}; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; +use crate::token_distributor::notif_utils::prepare_notif_payload; use comm_lib::auth::AuthService; use comm_lib::blob::client::S2SAuthedBlobClient; use comm_lib::blob::types::http::MirroredMediaInfo; use futures_util::{SinkExt, StreamExt}; use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; -use grpc_clients::identity::protos::auth::PeersDeviceListsRequest; +use grpc_clients::identity::protos::auth::{ + PeersDeviceListsRequest, PlatformDetails, +}; use grpc_clients::identity::DeviceType; use lapin::{options::*, types::FieldTable, ExchangeKind}; use std::time::Duration; @@ -33,6 +38,8 @@ grpc_client: ChainedInterceptedServicesAuthClient, message_sender: BasicMessageSender, blob_client: S2SAuthedBlobClient, + farcaster_client: FarcasterClient, + notif_client: GenericNotifClient, } impl TokenConnection { @@ -45,6 +52,7 @@ cancellation_token: CancellationToken, grpc_client: ChainedInterceptedServicesAuthClient, auth_service: &AuthService, + farcaster_client: FarcasterClient, ) { let message_sender = BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection)); @@ -60,6 +68,8 @@ auth_service, CONFIG.blob_service_url.clone(), ), + farcaster_client, + notif_client: GenericNotifClient::new(db.clone(), message_sender.clone()), message_sender, }; @@ -503,68 +513,97 @@ ); let conversation_id = &payload.conversation_id; - debug!( "Processing refresh for conversation ID: {}", conversation_id ); - let mut direct_cast_message = payload.message.clone(); - if let Some(medias) = replace_media_urls(&mut direct_cast_message) { - if let Err(err) = self.mirror_media_to_blob(medias).await { - if matches!(err, crate::farcaster::error::Error::Timeout) { - info!( - "Timeout when mirroring multimedia. Falling back to originals." - ); - } else { - warn!("Failed to mirror multimedia to blob: {err:?}"); + let notif_payload_future = async || { + let conversation = self + .farcaster_client + .fetch_conversation(&self.user_id, conversation_id) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to fetch conversation details: {e:?}", + )) + })?; + let current_user_fid = extract_fid_from_dcs_token(&self.token_data); + let notif = prepare_notif_payload( + payload, + &conversation, + current_user_fid.as_ref(), + ); + Ok::<_, TokenConnectionError>(notif) + }; + + let message_payload_future = async || { + let mut direct_cast_message = payload.message.clone(); + if let Some(medias) = replace_media_urls(&mut direct_cast_message) { + if let Err(err) = self.mirror_media_to_blob(medias).await { + if matches!(err, crate::farcaster::error::Error::Timeout) { + info!( + "Timeout when mirroring multimedia. Falling back to originals." + ); + } else { + warn!("Failed to mirror multimedia to blob: {err:?}"); + } + direct_cast_message = payload.message.clone(); } + } else { direct_cast_message = payload.message.clone(); } - } else { - direct_cast_message = payload.message.clone(); - } - let request = PeersDeviceListsRequest { - user_ids: vec![self.user_id.clone()], - }; - let user_devices_response = client - .get_device_lists_for_users(request) - .await - .map_err(|e| { - TokenConnectionError::MessageHandlingFailed(format!( - "Failed to get user devices: {}", - e - )) - })? - .into_inner(); - - let user_devices_option = user_devices_response - .users_devices_platform_details - .get(&self.user_id); - let message = NewFarcasterMessage { - message: direct_cast_message, - }; - if let Some(user_devices) = user_devices_option { + let message = NewFarcasterMessage { + message: direct_cast_message, + }; let message_payload = serde_json::to_string(&message).map_err(|e| { TokenConnectionError::MessageHandlingFailed(format!( "Failed to serialize: {}", e )) })?; + Ok::<_, TokenConnectionError>(message_payload) + }; - // Filter out keyservers - let device_ids = user_devices.devices_platform_details.iter().filter_map( - |(device_id, platform_details)| { - if platform_details.device_type() == DeviceType::Keyserver { - None - } else { - Some(device_id) + let recipient_devices = self.get_self_user_device_list(client).await?; + + let (message_payload_result, notif_payload_result) = + tokio::join!(message_payload_future(), notif_payload_future()); + + let message_payload = message_payload_result?; + let notif: Option<_> = match notif_payload_result { + Ok(notif_payload) => notif_payload, + Err(err) => { + tracing::error!("Failed to prepare notif payload: {:?}", err); + None + } + }; + + for (device_id, platform_details) in &recipient_devices { + let notif_future = async || { + let Some(notif_payload) = ¬if else { + return; + }; + let Ok(platform) = platform_details.device_type().try_into() else { + return; + }; + let target = NotifRecipientDescriptor { + platform, + device_id: device_id.to_string(), + }; + if let Err(err) = self + .notif_client + .send_notif(notif_payload.clone(), target) + .await + { + if !err.is_invalid_token() { + tracing::error!("Failed to send Farcaster notif: {:?}", err); } - }, - ); + } + }; - for device_id in device_ids { + let message_future = async || { self .message_sender .simple_send_message_to_device(device_id, message_payload.clone()) @@ -574,13 +613,52 @@ "Failed to send a message: {}", e )) - })?; - } + }) + }; + + let (message_result, _) = tokio::join!(message_future(), notif_future()); + message_result?; } Ok(()) } + async fn get_self_user_device_list( + &self, + client: &mut ChainedInterceptedServicesAuthClient, + ) -> Result, TokenConnectionError> { + let request = PeersDeviceListsRequest { + user_ids: vec![self.user_id.clone()], + }; + let mut user_devices_response = client + .get_device_lists_for_users(request) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to get user devices: {}", + e + )) + })? + .into_inner(); + + let Some(user_devices) = user_devices_response + .users_devices_platform_details + .remove(&self.user_id) + else { + return Ok(Vec::new()); + }; + + // Filter out keyservers + let result = user_devices + .devices_platform_details + .into_iter() + .filter(|(_, platform_details)| { + platform_details.device_type() != DeviceType::Keyserver + }) + .collect(); + Ok(result) + } + async fn mirror_media_to_blob( &self, medias: Vec, @@ -644,3 +722,21 @@ Some(found_medias) } + +/// Farcaster DCs token string has specific format, from which we can extract +/// authenticated user's FID. +/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue` +fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option { + const PREFIX: &str = "fc_dc_"; + const SEP: &str = "_"; + + let stripped = fc_dcs_token.strip_prefix(PREFIX)?; + let (fid, _rest) = stripped.split_once(SEP)?; + + let is_valid = fid.chars().all(|c: char| c.is_ascii_digit()); + if !is_valid { + return None; + } + + Some(fid.to_string()) +}