Page MenuHomePhorge

D15373.1765041872.diff
No OneTemporary

Size
8 KB
Referenced Files
None
Subscribers
None

D15373.1765041872.diff

diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs
--- a/services/tunnelbroker/src/main.rs
+++ b/services/tunnelbroker/src/main.rs
@@ -102,6 +102,7 @@
&amqp_connection,
grpc_client,
&auth_service,
+ farcaster_client.clone(),
);
tokio::select! {
diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs
--- a/services/tunnelbroker/src/token_distributor/mod.rs
+++ b/services/tunnelbroker/src/token_distributor/mod.rs
@@ -1,9 +1,11 @@
mod config;
mod error;
+mod notif_utils;
mod token_connection;
use crate::constants::error_types;
use crate::database::DatabaseClient;
+use crate::farcaster::FarcasterClient;
pub(crate) use crate::token_distributor::config::TokenDistributorConfig;
use crate::token_distributor::token_connection::TokenConnection;
use crate::{amqp_client::amqp::AmqpConnection, log::redact_sensitive_data};
@@ -23,6 +25,7 @@
amqp_connection: AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: AuthService,
+ farcaster_client: FarcasterClient,
}
impl TokenDistributor {
@@ -32,6 +35,7 @@
amqp_connection: &AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: &AuthService,
+ farcaster_client: FarcasterClient,
) -> Self {
info!(
"Initializing TokenDistributor - max_connections: {}, \
@@ -61,6 +65,7 @@
amqp_connection: amqp_connection.clone(),
grpc_client,
auth_service: auth_service.clone(),
+ farcaster_client,
}
}
@@ -202,6 +207,7 @@
cancel_token.clone(),
self.grpc_client.clone(),
&self.auth_service,
+ self.farcaster_client.clone(),
);
// Store the cancellation token
diff --git a/services/tunnelbroker/src/token_distributor/notif_utils.rs b/services/tunnelbroker/src/token_distributor/notif_utils.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/token_distributor/notif_utils.rs
@@ -0,0 +1,71 @@
+use tunnelbroker_messages::farcaster::{
+ DirectCastConversation, DirectCastMessageType,
+ RefreshDirectCastConversationPayload,
+};
+
+use crate::notifs::GenericNotifPayload;
+
+pub fn prepare_notif_payload(
+ payload: &RefreshDirectCastConversationPayload,
+ conversation: &DirectCastConversation,
+ recipient_fid: Option<&String>,
+) -> Option<GenericNotifPayload> {
+ let RefreshDirectCastConversationPayload {
+ conversation_id,
+ message,
+ ..
+ } = payload;
+
+ if conversation.muted {
+ // TODO: badge only?
+ return None;
+ }
+ if message.message_type != DirectCastMessageType::Text {
+ return None;
+ }
+
+ // Don't send a notif from self
+ if recipient_fid.is_some_and(|fid| *fid == message.sender_fid.to_string()) {
+ return None;
+ }
+
+ let message_metadata = message.extra.get("metadata");
+ let has_photos =
+ message_metadata.is_some_and(|metadata| metadata["medias"].is_array());
+ let has_videos =
+ message_metadata.is_some_and(|metadata| metadata["videos"].is_array());
+
+ let title = conversation
+ .name
+ .as_deref()
+ .or_else(|| {
+ conversation
+ .participant(message.sender_fid)
+ .map(|u| u.display_name.as_str())
+ })
+ .unwrap_or("Farcaster");
+
+ let body = if has_photos {
+ "[Photo message]"
+ } else if has_videos {
+ "[Video message]"
+ } else {
+ message.message.as_str()
+ };
+
+ Some(GenericNotifPayload {
+ title: trim_text(title, 100),
+ body: trim_text(body, 300),
+ thread_id: format!("FARCASTER#{}", conversation_id),
+ })
+}
+
+fn trim_text(text: &str, max_length: usize) -> String {
+ if text.len() <= max_length {
+ return text.to_string();
+ } else if max_length <= 3 {
+ return text[0..max_length].to_string();
+ }
+ let substr = text[0..(max_length - 3)].to_string();
+ format!("{}...", substr)
+}
diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs
--- a/services/tunnelbroker/src/token_distributor/token_connection.rs
+++ b/services/tunnelbroker/src/token_distributor/token_connection.rs
@@ -3,9 +3,12 @@
use crate::config::CONFIG;
use crate::constants::MEDIA_MIRROR_TIMEOUT;
use crate::database::DatabaseClient;
+use crate::farcaster::FarcasterClient;
use crate::log::redact_sensitive_data;
+use crate::notifs::{GenericNotifClient, NotifRecipientDescriptor};
use crate::token_distributor::config::TokenDistributorConfig;
use crate::token_distributor::error::TokenConnectionError;
+use crate::token_distributor::notif_utils::prepare_notif_payload;
use comm_lib::auth::AuthService;
use comm_lib::blob::client::S2SAuthedBlobClient;
use comm_lib::blob::types::http::MirroredMediaInfo;
@@ -32,6 +35,8 @@
grpc_client: ChainedInterceptedServicesAuthClient,
message_sender: BasicMessageSender,
blob_client: S2SAuthedBlobClient,
+ farcaster_client: FarcasterClient,
+ notif_client: GenericNotifClient,
}
impl TokenConnection {
@@ -44,6 +49,7 @@
cancellation_token: CancellationToken,
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: &AuthService,
+ farcaster_client: FarcasterClient,
) {
let message_sender =
BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection));
@@ -59,6 +65,8 @@
auth_service,
CONFIG.blob_service_url.clone(),
),
+ farcaster_client,
+ notif_client: GenericNotifClient::new(db.clone(), message_sender.clone()),
message_sender,
};
@@ -502,6 +510,18 @@
);
let conversation_id = &payload.conversation_id;
+ let conversation = self
+ .farcaster_client
+ .fetch_conversation(&self.user_id, conversation_id)
+ .await
+ .map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to fetch conversation details: {e:?}",
+ ))
+ })?;
+ let current_user_fid = extract_fid_from_dcs_token(&self.token_data);
+ let notif =
+ prepare_notif_payload(payload, &conversation, current_user_fid.as_ref());
debug!(
"Processing refresh for conversation ID: {}",
@@ -538,21 +558,45 @@
})?
.into_inner();
- let user_devices_option = user_devices_response
+ let Some(user_devices) = user_devices_response
.users_devices_platform_details
- .get(&self.user_id);
+ .get(&self.user_id)
+ else {
+ return Ok(());
+ };
+
let message = NewFarcasterMessage {
message: direct_cast_message,
};
- if let Some(user_devices) = user_devices_option {
- let message_payload = serde_json::to_string(&message).map_err(|e| {
- TokenConnectionError::MessageHandlingFailed(format!(
- "Failed to serialize: {}",
- e
- ))
- })?;
+ let message_payload = serde_json::to_string(&message).map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to serialize: {}",
+ e
+ ))
+ })?;
+
+ for (device_id, platform_details) in &user_devices.devices_platform_details
+ {
+ let notif_future = async || {
+ let Some(notif_payload) = &notif else {
+ return;
+ };
+ let target = NotifRecipientDescriptor {
+ platform: platform_details.device_type().into(),
+ device_id: device_id.to_string(),
+ };
+ if let Err(err) = self
+ .notif_client
+ .send_notif(notif_payload.clone(), target)
+ .await
+ {
+ if !err.is_invalid_token() {
+ tracing::error!("Failed to send Farcaster notif: {:?}", err);
+ }
+ }
+ };
- for device_id in user_devices.devices_platform_details.keys() {
+ let message_future = async || {
self
.message_sender
.simple_send_message_to_device(device_id, message_payload.clone())
@@ -562,8 +606,11 @@
"Failed to send a message: {}",
e
))
- })?;
- }
+ })
+ };
+
+ let (message_result, _) = tokio::join!(message_future(), notif_future());
+ message_result?;
}
Ok(())
@@ -632,3 +679,21 @@
Some(found_medias)
}
+
+/// Farcaster DCs token string has specific format, from which we can extract
+/// authenticated user's FID.
+/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue`
+fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option<String> {
+ const PREFIX: &str = "fc_dc_";
+ const SEP: &str = "_";
+
+ let stripped = fc_dcs_token.strip_prefix(PREFIX)?;
+ let (fid, _rest) = stripped.split_once(SEP)?;
+
+ let is_valid = fid.chars().all(|c: char| c.is_ascii_digit());
+ if !is_valid {
+ return None;
+ }
+
+ Some(fid.to_string())
+}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 6, 5:24 PM (20 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5839776
Default Alt Text
D15373.1765041872.diff (8 KB)

Event Timeline