Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32160910
D15373.1765041872.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
8 KB
Referenced Files
None
Subscribers
None
D15373.1765041872.diff
View Options
diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs
--- a/services/tunnelbroker/src/main.rs
+++ b/services/tunnelbroker/src/main.rs
@@ -102,6 +102,7 @@
&amqp_connection,
grpc_client,
&auth_service,
+ farcaster_client.clone(),
);
tokio::select! {
diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs
--- a/services/tunnelbroker/src/token_distributor/mod.rs
+++ b/services/tunnelbroker/src/token_distributor/mod.rs
@@ -1,9 +1,11 @@
mod config;
mod error;
+mod notif_utils;
mod token_connection;
use crate::constants::error_types;
use crate::database::DatabaseClient;
+use crate::farcaster::FarcasterClient;
pub(crate) use crate::token_distributor::config::TokenDistributorConfig;
use crate::token_distributor::token_connection::TokenConnection;
use crate::{amqp_client::amqp::AmqpConnection, log::redact_sensitive_data};
@@ -23,6 +25,7 @@
amqp_connection: AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: AuthService,
+ farcaster_client: FarcasterClient,
}
impl TokenDistributor {
@@ -32,6 +35,7 @@
amqp_connection: &AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: &AuthService,
+ farcaster_client: FarcasterClient,
) -> Self {
info!(
"Initializing TokenDistributor - max_connections: {}, \
@@ -61,6 +65,7 @@
amqp_connection: amqp_connection.clone(),
grpc_client,
auth_service: auth_service.clone(),
+ farcaster_client,
}
}
@@ -202,6 +207,7 @@
cancel_token.clone(),
self.grpc_client.clone(),
&self.auth_service,
+ self.farcaster_client.clone(),
);
// Store the cancellation token
diff --git a/services/tunnelbroker/src/token_distributor/notif_utils.rs b/services/tunnelbroker/src/token_distributor/notif_utils.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/token_distributor/notif_utils.rs
@@ -0,0 +1,71 @@
+use tunnelbroker_messages::farcaster::{
+ DirectCastConversation, DirectCastMessageType,
+ RefreshDirectCastConversationPayload,
+};
+
+use crate::notifs::GenericNotifPayload;
+
+pub fn prepare_notif_payload(
+ payload: &RefreshDirectCastConversationPayload,
+ conversation: &DirectCastConversation,
+ recipient_fid: Option<&String>,
+) -> Option<GenericNotifPayload> {
+ let RefreshDirectCastConversationPayload {
+ conversation_id,
+ message,
+ ..
+ } = payload;
+
+ if conversation.muted {
+ // TODO: badge only?
+ return None;
+ }
+ if message.message_type != DirectCastMessageType::Text {
+ return None;
+ }
+
+ // Don't send a notif from self
+ if recipient_fid.is_some_and(|fid| *fid == message.sender_fid.to_string()) {
+ return None;
+ }
+
+ let message_metadata = message.extra.get("metadata");
+ let has_photos =
+ message_metadata.is_some_and(|metadata| metadata["medias"].is_array());
+ let has_videos =
+ message_metadata.is_some_and(|metadata| metadata["videos"].is_array());
+
+ let title = conversation
+ .name
+ .as_deref()
+ .or_else(|| {
+ conversation
+ .participant(message.sender_fid)
+ .map(|u| u.display_name.as_str())
+ })
+ .unwrap_or("Farcaster");
+
+ let body = if has_photos {
+ "[Photo message]"
+ } else if has_videos {
+ "[Video message]"
+ } else {
+ message.message.as_str()
+ };
+
+ Some(GenericNotifPayload {
+ title: trim_text(title, 100),
+ body: trim_text(body, 300),
+ thread_id: format!("FARCASTER#{}", conversation_id),
+ })
+}
+
+fn trim_text(text: &str, max_length: usize) -> String {
+ if text.len() <= max_length {
+ return text.to_string();
+ } else if max_length <= 3 {
+ return text[0..max_length].to_string();
+ }
+ let substr = text[0..(max_length - 3)].to_string();
+ format!("{}...", substr)
+}
diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs
--- a/services/tunnelbroker/src/token_distributor/token_connection.rs
+++ b/services/tunnelbroker/src/token_distributor/token_connection.rs
@@ -3,9 +3,12 @@
use crate::config::CONFIG;
use crate::constants::MEDIA_MIRROR_TIMEOUT;
use crate::database::DatabaseClient;
+use crate::farcaster::FarcasterClient;
use crate::log::redact_sensitive_data;
+use crate::notifs::{GenericNotifClient, NotifRecipientDescriptor};
use crate::token_distributor::config::TokenDistributorConfig;
use crate::token_distributor::error::TokenConnectionError;
+use crate::token_distributor::notif_utils::prepare_notif_payload;
use comm_lib::auth::AuthService;
use comm_lib::blob::client::S2SAuthedBlobClient;
use comm_lib::blob::types::http::MirroredMediaInfo;
@@ -32,6 +35,8 @@
grpc_client: ChainedInterceptedServicesAuthClient,
message_sender: BasicMessageSender,
blob_client: S2SAuthedBlobClient,
+ farcaster_client: FarcasterClient,
+ notif_client: GenericNotifClient,
}
impl TokenConnection {
@@ -44,6 +49,7 @@
cancellation_token: CancellationToken,
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: &AuthService,
+ farcaster_client: FarcasterClient,
) {
let message_sender =
BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection));
@@ -59,6 +65,8 @@
auth_service,
CONFIG.blob_service_url.clone(),
),
+ farcaster_client,
+ notif_client: GenericNotifClient::new(db.clone(), message_sender.clone()),
message_sender,
};
@@ -502,6 +510,18 @@
);
let conversation_id = &payload.conversation_id;
+ let conversation = self
+ .farcaster_client
+ .fetch_conversation(&self.user_id, conversation_id)
+ .await
+ .map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to fetch conversation details: {e:?}",
+ ))
+ })?;
+ let current_user_fid = extract_fid_from_dcs_token(&self.token_data);
+ let notif =
+ prepare_notif_payload(payload, &conversation, current_user_fid.as_ref());
debug!(
"Processing refresh for conversation ID: {}",
@@ -538,21 +558,45 @@
})?
.into_inner();
- let user_devices_option = user_devices_response
+ let Some(user_devices) = user_devices_response
.users_devices_platform_details
- .get(&self.user_id);
+ .get(&self.user_id)
+ else {
+ return Ok(());
+ };
+
let message = NewFarcasterMessage {
message: direct_cast_message,
};
- if let Some(user_devices) = user_devices_option {
- let message_payload = serde_json::to_string(&message).map_err(|e| {
- TokenConnectionError::MessageHandlingFailed(format!(
- "Failed to serialize: {}",
- e
- ))
- })?;
+ let message_payload = serde_json::to_string(&message).map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to serialize: {}",
+ e
+ ))
+ })?;
+
+ for (device_id, platform_details) in &user_devices.devices_platform_details
+ {
+ let notif_future = async || {
+ let Some(notif_payload) = ¬if else {
+ return;
+ };
+ let target = NotifRecipientDescriptor {
+ platform: platform_details.device_type().into(),
+ device_id: device_id.to_string(),
+ };
+ if let Err(err) = self
+ .notif_client
+ .send_notif(notif_payload.clone(), target)
+ .await
+ {
+ if !err.is_invalid_token() {
+ tracing::error!("Failed to send Farcaster notif: {:?}", err);
+ }
+ }
+ };
- for device_id in user_devices.devices_platform_details.keys() {
+ let message_future = async || {
self
.message_sender
.simple_send_message_to_device(device_id, message_payload.clone())
@@ -562,8 +606,11 @@
"Failed to send a message: {}",
e
))
- })?;
- }
+ })
+ };
+
+ let (message_result, _) = tokio::join!(message_future(), notif_future());
+ message_result?;
}
Ok(())
@@ -632,3 +679,21 @@
Some(found_medias)
}
+
+/// Farcaster DCs token string has specific format, from which we can extract
+/// authenticated user's FID.
+/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue`
+fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option<String> {
+ const PREFIX: &str = "fc_dc_";
+ const SEP: &str = "_";
+
+ let stripped = fc_dcs_token.strip_prefix(PREFIX)?;
+ let (fid, _rest) = stripped.split_once(SEP)?;
+
+ let is_valid = fid.chars().all(|c: char| c.is_ascii_digit());
+ if !is_valid {
+ return None;
+ }
+
+ Some(fid.to_string())
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 6, 5:24 PM (20 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5839776
Default Alt Text
D15373.1765041872.diff (8 KB)
Attached To
Mode
D15373: [tunnelbroker] Send notifs on FC conversation refresh
Attached
Detach File
Event Timeline
Log In to Comment