diff --git a/services/tunnelbroker/src/database/token_distributor.rs b/services/tunnelbroker/src/database/token_distributor.rs --- a/services/tunnelbroker/src/database/token_distributor.rs +++ b/services/tunnelbroker/src/database/token_distributor.rs @@ -3,15 +3,21 @@ use comm_lib::aws::ddb::operation::update_item::UpdateItemError; use comm_lib::aws::ddb::types::AttributeValue; use comm_lib::database::shared_tables::farcaster_tokens; -use comm_lib::database::{AttributeMap, Error}; +use comm_lib::database::{AttributeExtractor, AttributeMap, Error}; use futures_util::TryFutureExt; use tracing::{debug, error}; +pub struct TokenEntryInfo { + pub user_id: String, + pub token_data: String, + pub fid: String, +} + impl DatabaseClient { pub async fn scan_orphaned_tokens( &self, timeout_threshold: u64, - ) -> Result, Error> { + ) -> Result, Error> { debug!( "Starting scan for orphaned tokens - timeout_threshold: {}", timeout_threshold @@ -69,19 +75,17 @@ fn process_items( items: impl IntoIterator, - ) -> impl Iterator { - items.into_iter().filter_map(|item| { - if let ( - Some(AttributeValue::S(user_id)), - Some(AttributeValue::S(token_data_str)), - ) = ( - item.get(farcaster_tokens::PARTITION_KEY), - item.get(farcaster_tokens::FARCASTER_DCS_TOKEN), - ) { - Some((user_id.to_string(), token_data_str.to_string())) - } else { - None - } + ) -> impl Iterator { + items.into_iter().filter_map(|mut item| { + let user_id = item.take_attr(farcaster_tokens::PARTITION_KEY).ok()?; + let token_data = + item.take_attr(farcaster_tokens::FARCASTER_DCS_TOKEN).ok()?; + let fid = item.take_attr(farcaster_tokens::FARCASTER_ID).ok()?; + Some(TokenEntryInfo { + user_id, + token_data, + fid, + }) }) } diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -153,7 +153,9 @@ } let mut claimed_count = 0; - for (user_id, token_data) in orphaned_tokens { + for token_info in orphaned_tokens { + let user_id = token_info.user_id.clone(); + if claimed_count >= available_slots { info!( "Reached maximum connections limit ({}), stopping token claiming", @@ -198,11 +200,11 @@ "Starting WebSocket connection task for user: {}", redact_sensitive_data(&user_id) ); + TokenConnection::start( self.db.clone(), self.config.clone(), - user_id.clone(), - token_data, + token_info, self.amqp_connection.clone(), cancel_token.clone(), self.grpc_client.clone(), @@ -211,7 +213,7 @@ ); // Store the cancellation token - self.connections.insert(user_id.clone(), cancel_token); + self.connections.insert(user_id, cancel_token); claimed_count += 1; info!( "Active connections: {}/{}", diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -2,6 +2,7 @@ use crate::amqp_client::utils::BasicMessageSender; use crate::config::CONFIG; use crate::constants::MEDIA_MIRROR_TIMEOUT; +use crate::database::token_distributor::TokenEntryInfo; use crate::database::DatabaseClient; use crate::farcaster::FarcasterClient; use crate::log::redact_sensitive_data; @@ -32,8 +33,7 @@ pub(crate) struct TokenConnection { db: DatabaseClient, config: TokenDistributorConfig, - user_id: String, - token_data: String, + token_info: TokenEntryInfo, amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, message_sender: BasicMessageSender, @@ -46,8 +46,7 @@ pub(crate) fn start( db: DatabaseClient, config: TokenDistributorConfig, - user_id: String, - token_data: String, + token_info: TokenEntryInfo, amqp_connection: AmqpConnection, cancellation_token: CancellationToken, grpc_client: ChainedInterceptedServicesAuthClient, @@ -57,11 +56,11 @@ let message_sender = BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection)); + let user_id = token_info.user_id.clone(); let connection = Self { db: db.clone(), config: config.clone(), - user_id: user_id.clone(), - token_data, + token_info, amqp_connection: amqp_connection.clone(), grpc_client, blob_client: S2SAuthedBlobClient::new( @@ -143,54 +142,55 @@ self, cancellation_token: CancellationToken, ) -> Result<(), TokenConnectionError> { + let user_id = &self.token_info.user_id; info!( "Starting connection for user: {}", - redact_sensitive_data(&self.user_id) + redact_sensitive_data(user_id) ); loop { tokio::select! { - result = self.connect_and_maintain(&self.token_data, &cancellation_token) => { + result = self.connect_and_maintain(&self.token_info.token_data, &cancellation_token) => { match result { Ok(_) => { - info!("Connection completed normally for user: {}", redact_sensitive_data(&self.user_id)); + info!("Connection completed normally for user: {}", redact_sensitive_data(user_id)); break; } Err(e) => { warn!( "Socket connection failed for user {}, reason: {}", - redact_sensitive_data(&self.user_id), + redact_sensitive_data(user_id), e ); // Check if we still own the token before retrying debug!( "Verifying token ownership for user {} before retry", - self.user_id + self.token_info.user_id ); - match self.db.update_token_heartbeat(&self.user_id, &self.config.instance_id).await { + match self.db.update_token_heartbeat(user_id, &self.config.instance_id).await { Ok(true) => { debug!( "Token ownership confirmed for user {}, restarting socket in 5 seconds", - self.user_id + user_id ); tokio::time::sleep(Duration::from_secs(5)).await; debug!( "Attempting socket reconnect for user {}", - self.user_id + user_id ); } Ok(false) => { warn!( "Lost token ownership for user {}, stopping reconnection attempts", - redact_sensitive_data(&self.user_id) + redact_sensitive_data(user_id) ); return Err(TokenConnectionError::TokenOwnershipLost); } Err(err) => { error!( "Failed to verify token ownership for user {}: {:?}, retrying in 5 seconds", - redact_sensitive_data(&self.user_id), + redact_sensitive_data(user_id), err ); tokio::time::sleep(Duration::from_secs(5)).await; @@ -200,7 +200,7 @@ } } _ = cancellation_token.cancelled() => { - info!("Connection cancelled for user: {}", redact_sensitive_data(&self.user_id)); + info!("Connection cancelled for user: {}", redact_sensitive_data(user_id)); return Err(TokenConnectionError::Cancelled); } } @@ -208,7 +208,7 @@ info!( "TokenConnection ended for user: {}", - redact_sensitive_data(&self.user_id) + redact_sensitive_data(user_id) ); Ok(()) } @@ -218,17 +218,16 @@ farcaster_token: &str, cancellation_token: &CancellationToken, ) -> Result<(), TokenConnectionError> { + let user_id = &self.token_info.user_id; + debug!( "Establishing WebSocket connection for user {} to {}", - self.user_id, self.config.farcaster_websocket_url + user_id, self.config.farcaster_websocket_url ); let (ws_stream, _) = connect_async(&self.config.farcaster_websocket_url).await?; - debug!( - "WebSocket connected successfully for user: {}", - self.user_id - ); + debug!("WebSocket connected successfully for user: {}", user_id); let (mut write, mut read) = ws_stream.split(); @@ -237,11 +236,11 @@ "messageType": "dc_authenticate", "data": farcaster_token }); - debug!("Sending authentication message for user: {}", self.user_id); + debug!("Sending authentication message for user: {}", user_id); if let Err(e) = write.send(Message::Text(auth_msg.to_string())).await { error!( "Failed to send auth message for user {}: {:?}, connection will be retried", - redact_sensitive_data(&self.user_id), + redact_sensitive_data(user_id), e ); return Err(TokenConnectionError::AuthenticationFailed(format!( @@ -252,17 +251,17 @@ info!( "WebSocket connected and authenticated successfully for user: {}", - redact_sensitive_data(&self.user_id) + redact_sensitive_data(user_id) ); // Set up AMQP topic listener for farcaster messages - let topic_name = format!("farcaster_user_{}", self.user_id); + let topic_name = format!("farcaster_user_{}", user_id); let mut amqp_consumer = match self.setup_amqp_consumer(&topic_name).await { Ok(consumer) => consumer, Err(e) => { error!( "Failed to setup AMQP consumer for user {}: {}", - redact_sensitive_data(&self.user_id), + redact_sensitive_data(user_id), e ); return Err(TokenConnectionError::AmqpSetupFailed(format!( @@ -279,7 +278,7 @@ trace!( "Ping timeout monitoring active for user: {} - timeout: {}s", - self.user_id, + user_id, self.config.ping_timeout.as_secs() ); @@ -293,21 +292,21 @@ match delivery_result { Ok(delivery) => { let payload = String::from_utf8_lossy(&delivery.data); - debug!("Received AMQP message for user {}: {}", self.user_id, payload); + debug!("Received AMQP message for user {}: {}", self.token_info.user_id, payload); // Forward message to WebSocket if let Err(e) = write.send(Message::Text(payload.to_string())).await { - error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(&self.user_id), e); + error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(user_id), e); } else { // Acknowledge the AMQP message if let Err(e) = delivery.ack(BasicAckOptions::default()).await { - error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(&self.user_id), e); + error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(user_id), e); } info!("Message {:?} sent", payload); } } Err(e) => { - error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(&self.user_id), e); + error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(user_id), e); } } } @@ -317,7 +316,7 @@ match msg { Some(Ok(msg)) => match msg { Message::Text(text) => { - debug!("Received message for {}: {}", self.user_id, text); + debug!("Received message for {}: {}", self.token_info.user_id, text); match serde_json::from_str::(&text) { Ok(farcaster_msg) => { match &farcaster_msg.payload { @@ -330,7 +329,7 @@ metricType = "TokenDistributor_ConnectionFailure", metricValue = 1, instanceId = self.config.instance_id, - userId = redact_sensitive_data(&self.user_id), + userId = redact_sensitive_data(user_id), errorType = "MessageHandlingFailed", "Failed to handle refresh direct cast conversation: {:?}", e @@ -351,29 +350,29 @@ } } Message::Binary(_data) => { - debug!("Received binary message for user: {}", self.user_id); + debug!("Received binary message for user: {}", user_id); } Message::Frame(_) => { - debug!("Received raw frame for user: {}", self.user_id); + debug!("Received raw frame for user: {}", user_id); } Message::Ping(data) => { let elapsed_since_last = last_ping.elapsed(); trace!("Received ping for user: {} ({}s since last ping), responding with pong", - self.user_id, elapsed_since_last.as_secs()); + user_id, elapsed_since_last.as_secs()); last_ping = Instant::now(); // Reset ping timeout ping_timeout.as_mut().reset(Instant::now() + self.config.ping_timeout); let _ = write.send(Message::Pong(data)).await; } Message::Pong(_data) => { - trace!("Received pong for user: {}", self.user_id); + trace!("Received pong for user: {}", user_id); } Message::Close(close_frame) => { let reason = if let Some(frame) = close_frame { let msg = format!("code: {}, reason: {}", frame.code, frame.reason); - error!("WebSocket closed for user {} - {}", redact_sensitive_data(&self.user_id), msg); + error!("WebSocket closed for user {} - {}", redact_sensitive_data(user_id), msg); msg } else { - error!("WebSocket closed for user {} without close frame", redact_sensitive_data(&self.user_id)); + error!("WebSocket closed for user {} without close frame", redact_sensitive_data(user_id)); "no close frame provided".to_string() }; return Err(TokenConnectionError::WebSocketClosed(reason)); @@ -382,7 +381,7 @@ Some(Err(e)) => { warn!( "WebSocket protocol error for user {}: {:?}, connection will be restarted", - redact_sensitive_data(&self.user_id), + redact_sensitive_data(user_id), e ); return Err(TokenConnectionError::WebSocketConnection(e)); @@ -390,7 +389,7 @@ None => { info!( "WebSocket stream ended unexpectedly for user: {}, connection will be restarted", - redact_sensitive_data(&self.user_id) + redact_sensitive_data(user_id) ); return Err(TokenConnectionError::StreamEnded); } @@ -399,21 +398,21 @@ // Send heartbeat updates _ = heartbeat_interval.tick() => { - match self.db.update_token_heartbeat(&self.user_id, &self.config.instance_id).await { + match self.db.update_token_heartbeat(user_id, &self.config.instance_id).await { Ok(true) => { - trace!("Heartbeat updated successfully for user: {}", self.user_id); + trace!("Heartbeat updated successfully for user: {}", user_id); } Ok(false) => { warn!( "Lost token ownership for user: {} - another instance may have claimed it", - redact_sensitive_data(&self.user_id) + redact_sensitive_data(user_id) ); return Err(TokenConnectionError::TokenOwnershipLost); } Err(e) => { error!( "Failed to update heartbeat for user {}: {:?}", - redact_sensitive_data(&self.user_id), + redact_sensitive_data(user_id), e ); return Err(TokenConnectionError::HeartbeatFailed(format!("Database error: {}", e))); @@ -426,14 +425,14 @@ let elapsed = last_ping.elapsed(); error!( "Ping timeout for user: {} - no ping received for {}s, connection dead", - redact_sensitive_data(&self.user_id), elapsed.as_secs() + redact_sensitive_data(user_id), elapsed.as_secs() ); return Err(TokenConnectionError::PingTimeout); } // Handle cancellation _ = cancellation_token.cancelled() => { - info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(&self.user_id)); + info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(user_id)); // Send close frame before terminating let _ = write.send(Message::Close(None)).await; @@ -461,7 +460,7 @@ .await?; // Declare queue with unique name for this connection - let queue_name = format!("{}_{}", topic_name, self.user_id); + let queue_name = format!("{}_{}", topic_name, self.token_info.user_id); let queue = channel .queue_declare( &queue_name, @@ -489,7 +488,7 @@ let consumer = channel .basic_consume( queue.name().as_str(), - &format!("consumer_{}_{}", topic_name, self.user_id), + &format!("consumer_{}_{}", topic_name, self.token_info.user_id), BasicConsumeOptions::default(), FieldTable::default(), ) @@ -497,7 +496,7 @@ debug!( "AMQP consumer set up for topic: {} user: {}", - topic_name, self.user_id + topic_name, self.token_info.user_id ); Ok(consumer) } @@ -509,7 +508,7 @@ ) -> Result<(), TokenConnectionError> { debug!( "Handling refresh direct cast conversation for user: {}", - self.user_id + self.token_info.user_id ); let conversation_id = &payload.conversation_id; @@ -521,18 +520,17 @@ let notif_payload_future = async || { let conversation = self .farcaster_client - .fetch_conversation(&self.user_id, conversation_id) + .fetch_conversation(&self.token_info.user_id, conversation_id) .await .map_err(|e| { TokenConnectionError::MessageHandlingFailed(format!( "Failed to fetch conversation details: {e:?}", )) })?; - let current_user_fid = extract_fid_from_dcs_token(&self.token_data); let notif = prepare_notif_payload( payload, &conversation, - current_user_fid.as_ref(), + Some(&self.token_info.fid), ); Ok::<_, TokenConnectionError>(notif) }; @@ -628,7 +626,7 @@ client: &mut ChainedInterceptedServicesAuthClient, ) -> Result, TokenConnectionError> { let request = PeersDeviceListsRequest { - user_ids: vec![self.user_id.clone()], + user_ids: vec![self.token_info.user_id.clone()], }; let mut user_devices_response = client .get_device_lists_for_users(request) @@ -643,7 +641,7 @@ let Some(user_devices) = user_devices_response .users_devices_platform_details - .remove(&self.user_id) + .remove(&self.token_info.user_id) else { return Ok(Vec::new()); }; @@ -722,21 +720,3 @@ Some(found_medias) } - -/// Farcaster DCs token string has specific format, from which we can extract -/// authenticated user's FID. -/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue` -fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option { - const PREFIX: &str = "fc_dc_"; - const SEP: &str = "_"; - - let stripped = fc_dcs_token.strip_prefix(PREFIX)?; - let (fid, _rest) = stripped.split_once(SEP)?; - - let is_valid = fid.chars().all(|c: char| c.is_ascii_digit()); - if !is_valid { - return None; - } - - Some(fid.to_string()) -}