Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32162122
D15379.1765044615.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
20 KB
Referenced Files
None
Subscribers
None
D15379.1765044615.diff
View Options
diff --git a/services/tunnelbroker/src/database/token_distributor.rs b/services/tunnelbroker/src/database/token_distributor.rs
--- a/services/tunnelbroker/src/database/token_distributor.rs
+++ b/services/tunnelbroker/src/database/token_distributor.rs
@@ -3,15 +3,21 @@
use comm_lib::aws::ddb::operation::update_item::UpdateItemError;
use comm_lib::aws::ddb::types::AttributeValue;
use comm_lib::database::shared_tables::farcaster_tokens;
-use comm_lib::database::{AttributeMap, Error};
+use comm_lib::database::{AttributeExtractor, AttributeMap, Error};
use futures_util::TryFutureExt;
use tracing::{debug, error};
+pub struct TokenEntryInfo {
+ pub user_id: String,
+ pub token_data: String,
+ pub fid: String,
+}
+
impl DatabaseClient {
pub async fn scan_orphaned_tokens(
&self,
timeout_threshold: u64,
- ) -> Result<Vec<(String, String)>, Error> {
+ ) -> Result<Vec<TokenEntryInfo>, Error> {
debug!(
"Starting scan for orphaned tokens - timeout_threshold: {}",
timeout_threshold
@@ -69,19 +75,17 @@
fn process_items(
items: impl IntoIterator<Item = AttributeMap>,
- ) -> impl Iterator<Item = (String, String)> {
- items.into_iter().filter_map(|item| {
- if let (
- Some(AttributeValue::S(user_id)),
- Some(AttributeValue::S(token_data_str)),
- ) = (
- item.get(farcaster_tokens::PARTITION_KEY),
- item.get(farcaster_tokens::FARCASTER_DCS_TOKEN),
- ) {
- Some((user_id.to_string(), token_data_str.to_string()))
- } else {
- None
- }
+ ) -> impl Iterator<Item = TokenEntryInfo> {
+ items.into_iter().filter_map(|mut item| {
+ let user_id = item.take_attr(farcaster_tokens::PARTITION_KEY).ok()?;
+ let token_data =
+ item.take_attr(farcaster_tokens::FARCASTER_DCS_TOKEN).ok()?;
+ let fid = item.take_attr(farcaster_tokens::FARCASTER_ID).ok()?;
+ Some(TokenEntryInfo {
+ user_id,
+ token_data,
+ fid,
+ })
})
}
diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs
--- a/services/tunnelbroker/src/token_distributor/mod.rs
+++ b/services/tunnelbroker/src/token_distributor/mod.rs
@@ -153,7 +153,9 @@
}
let mut claimed_count = 0;
- for (user_id, token_data) in orphaned_tokens {
+ for token_info in orphaned_tokens {
+ let user_id = token_info.user_id.clone();
+
if claimed_count >= available_slots {
info!(
"Reached maximum connections limit ({}), stopping token claiming",
@@ -198,11 +200,11 @@
"Starting WebSocket connection task for user: {}",
redact_sensitive_data(&user_id)
);
+
TokenConnection::start(
self.db.clone(),
self.config.clone(),
- user_id.clone(),
- token_data,
+ token_info,
self.amqp_connection.clone(),
cancel_token.clone(),
self.grpc_client.clone(),
@@ -211,7 +213,7 @@
);
// Store the cancellation token
- self.connections.insert(user_id.clone(), cancel_token);
+ self.connections.insert(user_id, cancel_token);
claimed_count += 1;
info!(
"Active connections: {}/{}",
diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs
--- a/services/tunnelbroker/src/token_distributor/token_connection.rs
+++ b/services/tunnelbroker/src/token_distributor/token_connection.rs
@@ -2,6 +2,7 @@
use crate::amqp_client::utils::BasicMessageSender;
use crate::config::CONFIG;
use crate::constants::MEDIA_MIRROR_TIMEOUT;
+use crate::database::token_distributor::TokenEntryInfo;
use crate::database::DatabaseClient;
use crate::farcaster::FarcasterClient;
use crate::log::redact_sensitive_data;
@@ -32,8 +33,7 @@
pub(crate) struct TokenConnection {
db: DatabaseClient,
config: TokenDistributorConfig,
- user_id: String,
- token_data: String,
+ token_info: TokenEntryInfo,
amqp_connection: AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
message_sender: BasicMessageSender,
@@ -46,8 +46,7 @@
pub(crate) fn start(
db: DatabaseClient,
config: TokenDistributorConfig,
- user_id: String,
- token_data: String,
+ token_info: TokenEntryInfo,
amqp_connection: AmqpConnection,
cancellation_token: CancellationToken,
grpc_client: ChainedInterceptedServicesAuthClient,
@@ -57,11 +56,11 @@
let message_sender =
BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection));
+ let user_id = token_info.user_id.clone();
let connection = Self {
db: db.clone(),
config: config.clone(),
- user_id: user_id.clone(),
- token_data,
+ token_info,
amqp_connection: amqp_connection.clone(),
grpc_client,
blob_client: S2SAuthedBlobClient::new(
@@ -143,54 +142,55 @@
self,
cancellation_token: CancellationToken,
) -> Result<(), TokenConnectionError> {
+ let user_id = &self.token_info.user_id;
info!(
"Starting connection for user: {}",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
loop {
tokio::select! {
- result = self.connect_and_maintain(&self.token_data, &cancellation_token) => {
+ result = self.connect_and_maintain(&self.token_info.token_data, &cancellation_token) => {
match result {
Ok(_) => {
- info!("Connection completed normally for user: {}", redact_sensitive_data(&self.user_id));
+ info!("Connection completed normally for user: {}", redact_sensitive_data(user_id));
break;
}
Err(e) => {
warn!(
"Socket connection failed for user {}, reason: {}",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
// Check if we still own the token before retrying
debug!(
"Verifying token ownership for user {} before retry",
- self.user_id
+ self.token_info.user_id
);
- match self.db.update_token_heartbeat(&self.user_id, &self.config.instance_id).await {
+ match self.db.update_token_heartbeat(user_id, &self.config.instance_id).await {
Ok(true) => {
debug!(
"Token ownership confirmed for user {}, restarting socket in 5 seconds",
- self.user_id
+ user_id
);
tokio::time::sleep(Duration::from_secs(5)).await;
debug!(
"Attempting socket reconnect for user {}",
- self.user_id
+ user_id
);
}
Ok(false) => {
warn!(
"Lost token ownership for user {}, stopping reconnection attempts",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
return Err(TokenConnectionError::TokenOwnershipLost);
}
Err(err) => {
error!(
"Failed to verify token ownership for user {}: {:?}, retrying in 5 seconds",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
err
);
tokio::time::sleep(Duration::from_secs(5)).await;
@@ -200,7 +200,7 @@
}
}
_ = cancellation_token.cancelled() => {
- info!("Connection cancelled for user: {}", redact_sensitive_data(&self.user_id));
+ info!("Connection cancelled for user: {}", redact_sensitive_data(user_id));
return Err(TokenConnectionError::Cancelled);
}
}
@@ -208,7 +208,7 @@
info!(
"TokenConnection ended for user: {}",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
Ok(())
}
@@ -218,17 +218,16 @@
farcaster_token: &str,
cancellation_token: &CancellationToken,
) -> Result<(), TokenConnectionError> {
+ let user_id = &self.token_info.user_id;
+
debug!(
"Establishing WebSocket connection for user {} to {}",
- self.user_id, self.config.farcaster_websocket_url
+ user_id, self.config.farcaster_websocket_url
);
let (ws_stream, _) =
connect_async(&self.config.farcaster_websocket_url).await?;
- debug!(
- "WebSocket connected successfully for user: {}",
- self.user_id
- );
+ debug!("WebSocket connected successfully for user: {}", user_id);
let (mut write, mut read) = ws_stream.split();
@@ -237,11 +236,11 @@
"messageType": "dc_authenticate",
"data": farcaster_token
});
- debug!("Sending authentication message for user: {}", self.user_id);
+ debug!("Sending authentication message for user: {}", user_id);
if let Err(e) = write.send(Message::Text(auth_msg.to_string())).await {
error!(
"Failed to send auth message for user {}: {:?}, connection will be retried",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::AuthenticationFailed(format!(
@@ -252,17 +251,17 @@
info!(
"WebSocket connected and authenticated successfully for user: {}",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
// Set up AMQP topic listener for farcaster messages
- let topic_name = format!("farcaster_user_{}", self.user_id);
+ let topic_name = format!("farcaster_user_{}", user_id);
let mut amqp_consumer = match self.setup_amqp_consumer(&topic_name).await {
Ok(consumer) => consumer,
Err(e) => {
error!(
"Failed to setup AMQP consumer for user {}: {}",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::AmqpSetupFailed(format!(
@@ -279,7 +278,7 @@
trace!(
"Ping timeout monitoring active for user: {} - timeout: {}s",
- self.user_id,
+ user_id,
self.config.ping_timeout.as_secs()
);
@@ -293,21 +292,21 @@
match delivery_result {
Ok(delivery) => {
let payload = String::from_utf8_lossy(&delivery.data);
- debug!("Received AMQP message for user {}: {}", self.user_id, payload);
+ debug!("Received AMQP message for user {}: {}", self.token_info.user_id, payload);
// Forward message to WebSocket
if let Err(e) = write.send(Message::Text(payload.to_string())).await {
- error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(&self.user_id), e);
+ error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(user_id), e);
} else {
// Acknowledge the AMQP message
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
- error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(&self.user_id), e);
+ error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(user_id), e);
}
info!("Message {:?} sent", payload);
}
}
Err(e) => {
- error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(&self.user_id), e);
+ error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(user_id), e);
}
}
}
@@ -317,7 +316,7 @@
match msg {
Some(Ok(msg)) => match msg {
Message::Text(text) => {
- debug!("Received message for {}: {}", self.user_id, text);
+ debug!("Received message for {}: {}", self.token_info.user_id, text);
match serde_json::from_str::<FarcasterMessage>(&text) {
Ok(farcaster_msg) => {
match &farcaster_msg.payload {
@@ -330,7 +329,7 @@
metricType = "TokenDistributor_ConnectionFailure",
metricValue = 1,
instanceId = self.config.instance_id,
- userId = redact_sensitive_data(&self.user_id),
+ userId = redact_sensitive_data(user_id),
errorType = "MessageHandlingFailed",
"Failed to handle refresh direct cast conversation: {:?}",
e
@@ -351,29 +350,29 @@
}
}
Message::Binary(_data) => {
- debug!("Received binary message for user: {}", self.user_id);
+ debug!("Received binary message for user: {}", user_id);
}
Message::Frame(_) => {
- debug!("Received raw frame for user: {}", self.user_id);
+ debug!("Received raw frame for user: {}", user_id);
}
Message::Ping(data) => {
let elapsed_since_last = last_ping.elapsed();
trace!("Received ping for user: {} ({}s since last ping), responding with pong",
- self.user_id, elapsed_since_last.as_secs());
+ user_id, elapsed_since_last.as_secs());
last_ping = Instant::now(); // Reset ping timeout
ping_timeout.as_mut().reset(Instant::now() + self.config.ping_timeout);
let _ = write.send(Message::Pong(data)).await;
}
Message::Pong(_data) => {
- trace!("Received pong for user: {}", self.user_id);
+ trace!("Received pong for user: {}", user_id);
}
Message::Close(close_frame) => {
let reason = if let Some(frame) = close_frame {
let msg = format!("code: {}, reason: {}", frame.code, frame.reason);
- error!("WebSocket closed for user {} - {}", redact_sensitive_data(&self.user_id), msg);
+ error!("WebSocket closed for user {} - {}", redact_sensitive_data(user_id), msg);
msg
} else {
- error!("WebSocket closed for user {} without close frame", redact_sensitive_data(&self.user_id));
+ error!("WebSocket closed for user {} without close frame", redact_sensitive_data(user_id));
"no close frame provided".to_string()
};
return Err(TokenConnectionError::WebSocketClosed(reason));
@@ -382,7 +381,7 @@
Some(Err(e)) => {
warn!(
"WebSocket protocol error for user {}: {:?}, connection will be restarted",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::WebSocketConnection(e));
@@ -390,7 +389,7 @@
None => {
info!(
"WebSocket stream ended unexpectedly for user: {}, connection will be restarted",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
return Err(TokenConnectionError::StreamEnded);
}
@@ -399,21 +398,21 @@
// Send heartbeat updates
_ = heartbeat_interval.tick() => {
- match self.db.update_token_heartbeat(&self.user_id, &self.config.instance_id).await {
+ match self.db.update_token_heartbeat(user_id, &self.config.instance_id).await {
Ok(true) => {
- trace!("Heartbeat updated successfully for user: {}", self.user_id);
+ trace!("Heartbeat updated successfully for user: {}", user_id);
}
Ok(false) => {
warn!(
"Lost token ownership for user: {} - another instance may have claimed it",
- redact_sensitive_data(&self.user_id)
+ redact_sensitive_data(user_id)
);
return Err(TokenConnectionError::TokenOwnershipLost);
}
Err(e) => {
error!(
"Failed to update heartbeat for user {}: {:?}",
- redact_sensitive_data(&self.user_id),
+ redact_sensitive_data(user_id),
e
);
return Err(TokenConnectionError::HeartbeatFailed(format!("Database error: {}", e)));
@@ -426,14 +425,14 @@
let elapsed = last_ping.elapsed();
error!(
"Ping timeout for user: {} - no ping received for {}s, connection dead",
- redact_sensitive_data(&self.user_id), elapsed.as_secs()
+ redact_sensitive_data(user_id), elapsed.as_secs()
);
return Err(TokenConnectionError::PingTimeout);
}
// Handle cancellation
_ = cancellation_token.cancelled() => {
- info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(&self.user_id));
+ info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(user_id));
// Send close frame before terminating
let _ = write.send(Message::Close(None)).await;
@@ -461,7 +460,7 @@
.await?;
// Declare queue with unique name for this connection
- let queue_name = format!("{}_{}", topic_name, self.user_id);
+ let queue_name = format!("{}_{}", topic_name, self.token_info.user_id);
let queue = channel
.queue_declare(
&queue_name,
@@ -489,7 +488,7 @@
let consumer = channel
.basic_consume(
queue.name().as_str(),
- &format!("consumer_{}_{}", topic_name, self.user_id),
+ &format!("consumer_{}_{}", topic_name, self.token_info.user_id),
BasicConsumeOptions::default(),
FieldTable::default(),
)
@@ -497,7 +496,7 @@
debug!(
"AMQP consumer set up for topic: {} user: {}",
- topic_name, self.user_id
+ topic_name, self.token_info.user_id
);
Ok(consumer)
}
@@ -509,7 +508,7 @@
) -> Result<(), TokenConnectionError> {
debug!(
"Handling refresh direct cast conversation for user: {}",
- self.user_id
+ self.token_info.user_id
);
let conversation_id = &payload.conversation_id;
@@ -521,18 +520,17 @@
let notif_payload_future = async || {
let conversation = self
.farcaster_client
- .fetch_conversation(&self.user_id, conversation_id)
+ .fetch_conversation(&self.token_info.user_id, conversation_id)
.await
.map_err(|e| {
TokenConnectionError::MessageHandlingFailed(format!(
"Failed to fetch conversation details: {e:?}",
))
})?;
- let current_user_fid = extract_fid_from_dcs_token(&self.token_data);
let notif = prepare_notif_payload(
payload,
&conversation,
- current_user_fid.as_ref(),
+ Some(&self.token_info.fid),
);
Ok::<_, TokenConnectionError>(notif)
};
@@ -628,7 +626,7 @@
client: &mut ChainedInterceptedServicesAuthClient,
) -> Result<Vec<(String, PlatformDetails)>, TokenConnectionError> {
let request = PeersDeviceListsRequest {
- user_ids: vec![self.user_id.clone()],
+ user_ids: vec![self.token_info.user_id.clone()],
};
let mut user_devices_response = client
.get_device_lists_for_users(request)
@@ -643,7 +641,7 @@
let Some(user_devices) = user_devices_response
.users_devices_platform_details
- .remove(&self.user_id)
+ .remove(&self.token_info.user_id)
else {
return Ok(Vec::new());
};
@@ -722,21 +720,3 @@
Some(found_medias)
}
-
-/// Farcaster DCs token string has specific format, from which we can extract
-/// authenticated user's FID.
-/// The pattern is as follows: `fc_dc_${FID}_remainingtokenvalue`
-fn extract_fid_from_dcs_token(fc_dcs_token: &str) -> Option<String> {
- const PREFIX: &str = "fc_dc_";
- const SEP: &str = "_";
-
- let stripped = fc_dcs_token.strip_prefix(PREFIX)?;
- let (fid, _rest) = stripped.split_once(SEP)?;
-
- let is_valid = fid.chars().all(|c: char| c.is_ascii_digit());
- if !is_valid {
- return None;
- }
-
- Some(fid.to_string())
-}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Dec 6, 6:10 PM (21 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5840016
Default Alt Text
D15379.1765044615.diff (20 KB)
Attached To
Mode
D15379: [tunnelbroker] Use FID from tokens table for notifs
Attached
Detach File
Event Timeline
Log In to Comment