Page MenuHomePhorge

D15379.1765044615.diff
No OneTemporary

Size
20 KB
Referenced Files
None
Subscribers
None

D15379.1765044615.diff

diff --git a/services/tunnelbroker/src/database/token_distributor.rs b/services/tunnelbroker/src/database/token_distributor.rs
--- a/services/tunnelbroker/src/database/token_distributor.rs
+++ b/services/tunnelbroker/src/database/token_distributor.rs
@@ -3,15 +3,21 @@
use comm_lib::aws::ddb::operation::update_item::UpdateItemError;
use comm_lib::aws::ddb::types::AttributeValue;
use comm_lib::database::shared_tables::farcaster_tokens;
-use comm_lib::database::{AttributeMap, Error};
+use comm_lib::database::{AttributeExtractor, AttributeMap, Error};
use futures_util::TryFutureExt;
use tracing::{debug, error};
+pub struct TokenEntryInfo {
+ pub user_id: String,
+ pub token_data: String,
+ pub fid: String,
+}
+
impl DatabaseClient {
pub async fn scan_orphaned_tokens(
&self,
timeout_threshold: u64,
- ) -> Result<Vec<(String, String)>, Error> {
+ ) -> Result<Vec<TokenEntryInfo>, Error> {
debug!(
"Starting scan for orphaned tokens - timeout_threshold: {}",
timeout_threshold
@@ -69,19 +75,17 @@
fn process_items(
items: impl IntoIterator<Item = AttributeMap>,
- ) -> impl Iterator<Item = (String, String)> {
- items.into_iter().filter_map(|item| {
- if let (
- Some(AttributeValue::S(user_id)),
- Some(AttributeValue::S(token_data_str)),
- ) = (
- item.get(farcaster_tokens::PARTITION_KEY),
- item.get(farcaster_tokens::FARCASTER_DCS_TOKEN),
- ) {
- Some((user_id.to_string(), token_data_str.to_string()))
- } else {
- None
- }
+ ) -> impl Iterator<Item = TokenEntryInfo> {
+ items.into_iter().filter_map(|mut item| {
+ let user_id = item.take_attr(farcaster_tokens::PARTITION_KEY).ok()?;
+ let token_data =
+ item.take_attr(farcaster_tokens::FARCASTER_DCS_TOKEN).ok()?;
+ let fid = item.take_attr(farcaster_tokens::FARCASTER_ID).ok()?;
+ Some(TokenEntryInfo {
+ user_id,
+ token_data,
+ fid,
+ })
})
}
diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs
--- a/services/tunnelbroker/src/token_distributor/mod.rs
+++ b/services/tunnelbroker/src/token_distributor/mod.rs
@@ -153,7 +153,9 @@
}
let mut claimed_count = 0;
- for (user_id, token_data) in orphaned_tokens {
+ for token_info in orphaned_tokens {
+ let user_id = token_info.user_id.clone();
+
if claimed_count >= available_slots {
info!(
"Reached maximum connections limit ({}), stopping token claiming",
@@ -198,11 +200,11 @@
"Starting WebSocket connection task for user: {}",
redact_sensitive_data(&user_id)
);
+
TokenConnection::start(
self.db.clone(),
self.config.clone(),
- user_id.clone(),
- token_data,
+ token_info,
self.amqp_connection.clone(),
cancel_token.clone(),
self.grpc_client.clone(),
@@ -211,7 +213,7 @@
);
// Store the cancellation token
- self.connections.insert(user_id.clone(), cancel_token);
+ self.connections.insert(user_id, cancel_token);
claimed_count += 1;
info!(
"Active connections: {}/{}",
diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs
--- a/services/tunnelbroker/src/token_distributor/token_connection.rs
+++ b/services/tunnelbroker/src/token_distributor/token_connection.rs
@@ -2,6 +2,7 @@
use crate::amqp_client::utils::BasicMessageSender;
use crate::config::CONFIG;
use crate::constants::MEDIA_MIRROR_TIMEOUT;
+use crate::database::token_distributor::TokenEntryInfo;
use crate::database::DatabaseClient;
use crate::farcaster::FarcasterClient;
use crate::log::redact_sensitive_data;
@@ -32,8 +33,7 @@
pub(crate) struct TokenConnection {
db: DatabaseClient,
config: TokenDistributorConfig,
- user_id: String,
- token_data: String,
+ token_info: TokenEntryInfo,
amqp_connection: AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
message_sender: BasicMessageSender,
@@ -46,8 +46,7 @@
pub(crate) fn start(
db: DatabaseClient,
config: TokenDistributorConfig,
- user_id: String,
- token_data: String,
+ token_info: TokenEntryInfo,
amqp_connection: AmqpConnection,
cancellation_token: CancellationToken,
grpc_client: ChainedInterceptedServicesAuthClient,
@@ -57,11 +56,11 @@
let message_sender =
BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection));
+ let user_id = token_info.user_id.clone();
let connection = Self {
db: db.clone(),
config: config.clone(),
- user_id: user_id.clone(),
- token_data,
+ token_info,
amqp_connection: amqp_connection.clone(),
grpc_client,
blob_client: S2SAuthedBlobClient::new(
@@ -143,54 +142,55 @@
self,
cancellation_token: CancellationToken,
) -> Result<(), TokenConnectionError> {
+ let user_id = &self.token_info.user_id;
info!(
"Starting connection for user: {}",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
loop {
tokio::select! {
- result = self.connect_and_maintain(&self.token_data, &cancellation_token) => {
+ result = self.connect_and_maintain(&self.token_info.token_data, &cancellation_token) => {
match result {
Ok(_) => {
- info!("Connection completed normally for user: {}", redact_sensitive_data(&self.user_id));
+ info!("Connection completed normally for user: {}", redact_sensitive_data(user_id));
break;
}
Err(e) => {
warn!(
"Socket connection failed for user {}, reason: {}",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
// Check if we still own the token before retrying
debug!(
"Verifying token ownership for user {} before retry",
- self.user_id
+ self.token_info.user_id
);
- match self.db.update_token_heartbeat(&self.user_id, &self.config.instance_id).await {
+ match self.db.update_token_heartbeat(user_id, &self.config.instance_id).await {
Ok(true) => {
debug!(
"Token ownership confirmed for user {}, restarting socket in 5 seconds",
- self.user_id
+ user_id
);
tokio::time::sleep(Duration::from_secs(5)).await;
debug!(
"Attempting socket reconnect for user {}",
- self.user_id
+ user_id
);
}
Ok(false) => {
warn!(
"Lost token ownership for user {}, stopping reconnection attempts",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
return Err(TokenConnectionError::TokenOwnershipLost);
}
Err(err) => {
error!(
"Failed to verify token ownership for user {}: {:?}, retrying in 5 seconds",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
err
);
tokio::time::sleep(Duration::from_secs(5)).await;
@@ -200,7 +200,7 @@
}
}
_ = cancellation_token.cancelled() => {
- info!("Connection cancelled for user: {}", redact_sensitive_data(&self.user_id));
+ info!("Connection cancelled for user: {}", redact_sensitive_data(user_id));
return Err(TokenConnectionError::Cancelled);
}
}
@@ -208,7 +208,7 @@
info!(
"TokenConnection ended for user: {}",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
Ok(())
}
@@ -218,17 +218,16 @@
farcaster_token: &str,
cancellation_token: &CancellationToken,
) -> Result<(), TokenConnectionError> {
+ let user_id = &self.token_info.user_id;
+
debug!(
"Establishing WebSocket connection for user {} to {}",
- self.user_id, self.config.farcaster_websocket_url
+ user_id, self.config.farcaster_websocket_url
);
let (ws_stream, _) =
connect_async(&self.config.farcaster_websocket_url).await?;
- debug!(
- "WebSocket connected successfully for user: {}",
- self.user_id
- );
+ debug!("WebSocket connected successfully for user: {}", user_id);
let (mut write, mut read) = ws_stream.split();
@@ -237,11 +236,11 @@
"messageType": "dc_authenticate",
"data": farcaster_token
});
- debug!("Sending authentication message for user: {}", self.user_id);
+ debug!("Sending authentication message for user: {}", user_id);
if let Err(e) = write.send(Message::Text(auth_msg.to_string())).await {
error!(
"Failed to send auth message for user {}: {:?}, connection will be retried",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::AuthenticationFailed(format!(
@@ -252,17 +251,17 @@
info!(
"WebSocket connected and authenticated successfully for user: {}",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
// Set up AMQP topic listener for farcaster messages
- let topic_name = format!("farcaster_user_{}", self.user_id);
+ let topic_name = format!("farcaster_user_{}", user_id);
let mut amqp_consumer = match self.setup_amqp_consumer(&topic_name).await {
Ok(consumer) => consumer,
Err(e) => {
error!(
"Failed to setup AMQP consumer for user {}: {}",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::AmqpSetupFailed(format!(
@@ -279,7 +278,7 @@
trace!(
"Ping timeout monitoring active for user: {} - timeout: {}s",
- self.user_id,
+ user_id,
self.config.ping_timeout.as_secs()
);
@@ -293,21 +292,21 @@
match delivery_result {
Ok(delivery) => {
let payload = String::from_utf8_lossy(&delivery.data);
- debug!("Received AMQP message for user {}: {}", self.user_id, payload);
+ debug!("Received AMQP message for user {}: {}", self.token_info.user_id, payload);
// Forward message to WebSocket
if let Err(e) = write.send(Message::Text(payload.to_string())).await {
- error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(&self.user_id), e);
+ error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(user_id), e);
} else {
// Acknowledge the AMQP message
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
- error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(&self.user_id), e);
+ error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(user_id), e);
}
info!("Message {:?} sent", payload);
}
}
Err(e) => {
- error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(&self.user_id), e);
+ error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(user_id), e);
}
}
}
@@ -317,7 +316,7 @@
match msg {
Some(Ok(msg)) => match msg {
Message::Text(text) => {
- debug!("Received message for {}: {}", self.user_id, text);
+ debug!("Received message for {}: {}", self.token_info.user_id, text);
match serde_json::from_str::<FarcasterMessage>(&text) {
Ok(farcaster_msg) => {
match &farcaster_msg.payload {
@@ -330,7 +329,7 @@
metricType = "TokenDistributor_ConnectionFailure",
metricValue = 1,
instanceId = self.config.instance_id,
- userId = redact_sensitive_data(&self.user_id),
+ userId = redact_sensitive_data(user_id),
errorType = "MessageHandlingFailed",
"Failed to handle refresh direct cast conversation: {:?}",
e
@@ -351,29 +350,29 @@
}
}
Message::Binary(_data) => {
- debug!("Received binary message for user: {}", self.user_id);
+ debug!("Received binary message for user: {}", user_id);
}
Message::Frame(_) => {
- debug!("Received raw frame for user: {}", self.user_id);
+ debug!("Received raw frame for user: {}", user_id);
}
Message::Ping(data) => {
let elapsed_since_last = last_ping.elapsed();
trace!("Received ping for user: {} ({}s since last ping), responding with pong",
- self.user_id, elapsed_since_last.as_secs());
+ user_id, elapsed_since_last.as_secs());
last_ping = Instant::now(); // Reset ping timeout
ping_timeout.as_mut().reset(Instant::now() + self.config.ping_timeout);
let _ = write.send(Message::Pong(data)).await;
}
Message::Pong(_data) => {
- trace!("Received pong for user: {}", self.user_id);
+ trace!("Received pong for user: {}", user_id);
}
Message::Close(close_frame) => {
let reason = if let Some(frame) = close_frame {
let msg = format!("code: {}, reason: {}", frame.code, frame.reason);
- error!("WebSocket closed for user {} - {}", redact_sensitive_data(&self.user_id), msg);
+ error!("WebSocket closed for user {} - {}", redact_sensitive_data(user_id), msg);
msg
} else {
- error!("WebSocket closed for user {} without close frame", redact_sensitive_data(&self.user_id));
+ error!("WebSocket closed for user {} without close frame", redact_sensitive_data(user_id));
"no close frame provided".to_string()
};
return Err(TokenConnectionError::WebSocketClosed(reason));
@@ -382,7 +381,7 @@
Some(Err(e)) => {
warn!(
"WebSocket protocol error for user {}: {:?}, connection will be restarted",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::WebSocketConnection(e));
@@ -390,7 +389,7 @@
None => {
info!(
"WebSocket stream ended unexpectedly for user: {}, connection will be restarted",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
return Err(TokenConnectionError::StreamEnded);
}
@@ -399,21 +398,21 @@
// Send heartbeat updates
_ = heartbeat_interval.tick() => {
- match self.db.update_token_heartbeat(&self.user_id, &self.config.instance_id).await {
+ match self.db.update_token_heartbeat(user_id, &self.config.instance_id).await {
Ok(true) => {
- trace!("Heartbeat updated successfully for user: {}", self.user_id);
+ trace!("Heartbeat updated successfully for user: {}", user_id);
}
Ok(false) => {
warn!(
"Lost token ownership for user: {} - another instance may have claimed it",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
return Err(TokenConnectionError::TokenOwnershipLost);
}
Err(e) => {
error!(
"Failed to update heartbeat for user {}: {:?}",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::HeartbeatFailed(format!("Database error: {}", e)));
@@ -426,14 +425,14 @@
let elapsed = last_ping.elapsed();
error!(
"Ping timeout for user: {} - no ping received for {}s, connection dead",
- redact_sensitive_data(&self.user_id), elapsed.as_secs()
+ redact_sensitive_data(user_id), elapsed.as_secs()
);
return Err(TokenConnectionError::PingTimeout);
}
// Handle cancellation
_ = cancellation_token.cancelled() => {
- info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(&self.user_id));
+ info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(user_id));
// Send close frame before terminating
let _ = write.send(Message::Close(None)).await;
@@ -461,7 +460,7 @@
.await?;
// Declare queue with unique name for this connection
- let queue_name = format!("{}_{}", topic_name, self.user_id);
+ let queue_name = format!("{}_{}", topic_name, self.token_info.user_id);
let queue = channel
.queue_declare(
&queue_name,
@@ -489,7 +488,7 @@
let consumer = channel
.basic_consume(
queue.name().as_str(),
- &format!("consumer_{}_{}", topic_name, self.user_id),
+ &format!("consumer_{}_{}", topic_name, self.token_info.user_id),
BasicConsumeOptions::default(),
FieldTable::default(),
)
@@ -497,7 +496,7 @@
debug!(
"AMQP consumer set up for topic: {} user: {}",
- topic_name, self.user_id
+ topic_name, self.token_info.user_id
);
Ok(consumer)
}
@@ -509,7 +508,7 @@
) -> Result<(), TokenConnectionError> {
debug!(
"Handling refresh direct cast conversation for user: {}",
- self.user_id
+ self.token_info.user_id
);
let conversation_id = &payload.conversation_id;
@@ -521,18 +520,17 @@
let notif_payload_future = async || {
let conversation = self
.farcaster_client
- .fetch_conversation(&self.user_id, conversation_id)
+ .fetch_conversation(&self.token_info.user_id, conversation_id)
.await
.map_err(|e| {
TokenConnectionError::MessageHandlingFailed(format!(
"Failed to fetch conversation details: {e:?}",
))
})?;
- let current_user_fid = extract_fid_from_dcs_token(&self.token_data);
let notif = prepare_notif_payload(
payload,
&conversation,
- current_user_fid.as_ref(),
+ Some(&self.token_info.fid),
);
Ok::<_, TokenConnectionError>(notif)
};
@@ -628,7 +626,7 @@
client: &mut ChainedInterceptedServicesAuthClient,
) -> Result<Vec<(String, PlatformDetails)>, TokenConnectionError> {
let request = PeersDeviceListsRequest {
- user_ids: vec![self.user_id.clone()],
+ user_ids: vec![self.token_info.user_id.clone()],
};
let mut user_devices_response = client
.get_device_lists_for_users(request)
@@ -643,7 +641,7 @@
let Some(user_devices) = user_devices_response
.users_devices_platform_details
- .remove(&self.user_id)
+ .remove(&self.token_info.user_id)
else {
return Ok(Vec::new());
};
@@ -722,21 +720,3 @@
Some(found_medias)
}
-
-/// Farcaster DCs token string has specific format, from which we can extract
-/// authenticated user's FID.
-/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue`
-fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option<String> {
- const PREFIX: &str = "fc_dc_";
- const SEP: &str = "_";
-
- let stripped = fc_dcs_token.strip_prefix(PREFIX)?;
- let (fid, _rest) = stripped.split_once(SEP)?;
-
- let is_valid = fid.chars().all(|c: char| c.is_ascii_digit());
- if !is_valid {
- return None;
- }
-
- Some(fid.to_string())
-}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 6, 6:10 PM (21 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5840016
Default Alt Text
D15379.1765044615.diff (20 KB)

Event Timeline