Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32185936
D15229.1765084229.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
11 KB
Referenced Files
None
Subscribers
None
D15229.1765084229.diff
View Options
diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs
--- a/services/tunnelbroker/src/database/mod.rs
+++ b/services/tunnelbroker/src/database/mod.rs
@@ -18,6 +18,7 @@
pub mod message;
pub mod message_id;
+pub mod token_distributor;
use crate::database::message_id::MessageID;
pub use message::*;
diff --git a/services/tunnelbroker/src/database/token_distributor.rs b/services/tunnelbroker/src/database/token_distributor.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/database/token_distributor.rs
@@ -0,0 +1,379 @@
+use crate::constants::error_types;
+use crate::database::DatabaseClient;
+use comm_lib::aws::ddb::operation::update_item::UpdateItemError;
+use comm_lib::aws::ddb::types::AttributeValue;
+use comm_lib::database::shared_tables::farcaster_tokens;
+use comm_lib::database::{AttributeMap, Error};
+use futures_util::TryFutureExt;
+use tracing::{debug, error};
+
+impl DatabaseClient {
+ pub async fn scan_orphaned_tokens(
+ &self,
+ timeout_threshold: u64,
+ ) -> Result<Vec<(String, String)>, Error> {
+ debug!(
+ "Starting scan for orphaned tokens - timeout_threshold: {}",
+ timeout_threshold
+ );
+ let mut orphaned_tokens = Vec::new();
+
+ // Query unassigned tokens using sparse index
+ let unassigned_future = self
+ .client
+ .query()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .index_name(farcaster_tokens::UNASSIGNED_INDEX)
+ .key_condition_expression("#unassigned = :unassigned_val")
+ .expression_attribute_names("#unassigned", farcaster_tokens::UNASSIGNED)
+ .expression_attribute_values(
+ ":unassigned_val",
+ AttributeValue::S("true".to_string()),
+ )
+ .send();
+
+ // Query expired tokens using composite index
+ let expired_future = self
+ .client
+ .scan()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .index_name(farcaster_tokens::ASSIGNED_INSTANCE_LAST_HEARTBEAT_INDEX)
+ .filter_expression("#last_heartbeat < :timeout")
+ .expression_attribute_names(
+ "#last_heartbeat",
+ farcaster_tokens::LAST_HEARTBEAT,
+ )
+ .expression_attribute_values(
+ ":timeout",
+ AttributeValue::N(timeout_threshold.to_string()),
+ )
+ .send();
+
+ // Execute queries in parallel
+ let (unassigned_result, expired_result) = tokio::try_join!(
+ unassigned_future.map_err(|e| {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Failed to query unassigned tokens: {:?}", e
+ );
+ Error::AwsSdk(e.into())
+ }),
+ expired_future.map_err(|e| {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Failed to query expired tokens: {:?}", e
+ );
+ Error::AwsSdk(e.into())
+ })
+ )?;
+
+ fn process_items(
+ items: impl IntoIterator<Item = AttributeMap>,
+ ) -> impl Iterator<Item = (String, String)> {
+ items.into_iter().filter_map(|item| {
+ if let (
+ Some(AttributeValue::S(user_id)),
+ Some(AttributeValue::S(token_data_str)),
+ ) = (
+ item.get(farcaster_tokens::PARTITION_KEY),
+ item.get(farcaster_tokens::FARCASTER_DCS_TOKEN),
+ ) {
+ Some((user_id.to_string(), token_data_str.to_string()))
+ } else {
+ None
+ }
+ })
+ }
+
+ let unassigned_items = unassigned_result.items.unwrap_or_default();
+ let expired_items = expired_result.items.unwrap_or_default();
+
+ debug!(
+ "Found {} unassigned tokens and {} expired tokens in database",
+ unassigned_items.len(),
+ expired_items.len()
+ );
+
+ orphaned_tokens.extend(process_items(unassigned_items));
+ orphaned_tokens.extend(process_items(expired_items));
+
+ debug!(
+ "Processed {} total orphaned tokens for claiming",
+ orphaned_tokens.len()
+ );
+ Ok(orphaned_tokens)
+ }
+
+ pub async fn claim_token(
+ &self,
+ user_id: &str,
+ instance_id: &str,
+ timeout_threshold: u64,
+ ) -> Result<bool, Error> {
+ debug!(
+ "Attempting to claim token for user: {} by instance: {}",
+ user_id, instance_id
+ );
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .expect("Time went backwards")
+ .as_secs();
+
+ let result = self
+ .client
+ .update_item()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .key(
+ farcaster_tokens::PARTITION_KEY,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .update_expression(
+ "SET \
+ #assigned_instance = :instance_id, \
+ #assignment_timestamp = :now, \
+ #last_heartbeat = :now \
+ REMOVE #unassigned",
+ )
+ .condition_expression(
+ "attribute_not_exists(#assigned_instance) \
+ OR #last_heartbeat < :timeout",
+ )
+ .expression_attribute_names(
+ "#assigned_instance",
+ farcaster_tokens::ASSIGNED_INSTANCE,
+ )
+ .expression_attribute_names(
+ "#assignment_timestamp",
+ farcaster_tokens::ASSIGNMENT_TIMESTAMP,
+ )
+ .expression_attribute_names(
+ "#last_heartbeat",
+ farcaster_tokens::LAST_HEARTBEAT,
+ )
+ .expression_attribute_names("#unassigned", farcaster_tokens::UNASSIGNED)
+ .expression_attribute_values(
+ ":instance_id",
+ AttributeValue::S(instance_id.to_string()),
+ )
+ .expression_attribute_values(":now", AttributeValue::N(now.to_string()))
+ .expression_attribute_values(
+ ":timeout",
+ AttributeValue::N(timeout_threshold.to_string()),
+ )
+ .send()
+ .await;
+
+ match result {
+ Ok(_) => {
+ debug!(
+ "Token successfully claimed for user: {} by instance: {}",
+ user_id, instance_id
+ );
+ Ok(true)
+ }
+ Err(sdk_error) => match sdk_error.into_service_error() {
+ UpdateItemError::ConditionalCheckFailedException(_) => {
+ debug!("Token claim failed for user: {} - already claimed or condition not met", user_id);
+ Ok(false)
+ }
+ other => {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Database error claiming token for user {}: {:?}", user_id, other
+ );
+ Err(Error::AwsSdk(other.into()))
+ }
+ },
+ }
+ }
+
+ pub async fn update_token_heartbeat(
+ &self,
+ user_id: &str,
+ instance_id: &str,
+ ) -> Result<bool, Error> {
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .expect("Time went backwards")
+ .as_secs();
+
+ let result = self
+ .client
+ .update_item()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .key(
+ farcaster_tokens::PARTITION_KEY,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .update_expression("SET #last_heartbeat = :now")
+ .condition_expression("#assigned_instance = :instance_id")
+ .expression_attribute_names(
+ "#last_heartbeat",
+ farcaster_tokens::LAST_HEARTBEAT,
+ )
+ .expression_attribute_names(
+ "#assigned_instance",
+ farcaster_tokens::ASSIGNED_INSTANCE,
+ )
+ .expression_attribute_values(":now", AttributeValue::N(now.to_string()))
+ .expression_attribute_values(
+ ":instance_id",
+ AttributeValue::S(instance_id.to_string()),
+ )
+ .send()
+ .await;
+
+ match result {
+ Ok(_) => Ok(true),
+ Err(sdk_error) => match sdk_error.into_service_error() {
+ UpdateItemError::ConditionalCheckFailedException(_) => Ok(false),
+ other => {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Failed to update token heartbeat: {:?}", other
+ );
+ Err(Error::AwsSdk(other.into()))
+ }
+ },
+ }
+ }
+
+ pub async fn get_user_ids_with_token_for_instance(
+ &self,
+ instance_id: &str,
+ ) -> Result<Vec<String>, Error> {
+ let query_result = self
+ .client
+ .query()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .index_name(farcaster_tokens::ASSIGNED_INSTANCE_LAST_HEARTBEAT_INDEX)
+ .key_condition_expression("#assigned_instance = :instance_id")
+ .expression_attribute_names(
+ "#assigned_instance",
+ farcaster_tokens::ASSIGNED_INSTANCE,
+ )
+ .expression_attribute_values(
+ ":instance_id",
+ AttributeValue::S(instance_id.to_string()),
+ )
+ .send()
+ .await
+ .map_err(|e| {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Failed to query tokens for instance {}: {:?}", instance_id, e
+ );
+ Error::AwsSdk(e.into())
+ })?;
+
+ let mut user_ids = Vec::new();
+ let items = query_result.items.unwrap_or_default();
+
+ for item in items {
+ if let Some(AttributeValue::S(user_id)) =
+ item.get(farcaster_tokens::PARTITION_KEY)
+ {
+ user_ids.push(user_id.to_string());
+ }
+ }
+
+ Ok(user_ids)
+ }
+
+ pub async fn release_token(
+ &self,
+ user_id: &str,
+ instance_id: &str,
+ ) -> Result<bool, Error> {
+ debug!(
+ "Attempting to release token for user: {} from instance: {}",
+ user_id, instance_id
+ );
+ let result = self
+ .client
+ .update_item()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .key(
+ farcaster_tokens::PARTITION_KEY,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .update_expression(
+ "SET \
+ #unassigned = :unassigned_val \
+ REMOVE \
+ #assigned_instance, \
+ #assignment_timestamp, \
+ #last_heartbeat",
+ )
+ .condition_expression("#assigned_instance = :instance_id")
+ .expression_attribute_names("#unassigned", farcaster_tokens::UNASSIGNED)
+ .expression_attribute_names(
+ "#assigned_instance",
+ farcaster_tokens::ASSIGNED_INSTANCE,
+ )
+ .expression_attribute_names(
+ "#assignment_timestamp",
+ farcaster_tokens::ASSIGNMENT_TIMESTAMP,
+ )
+ .expression_attribute_names(
+ "#last_heartbeat",
+ farcaster_tokens::LAST_HEARTBEAT,
+ )
+ .expression_attribute_values(
+ ":unassigned_val",
+ AttributeValue::S("true".to_string()),
+ )
+ .expression_attribute_values(
+ ":instance_id",
+ AttributeValue::S(instance_id.to_string()),
+ )
+ .send()
+ .await;
+
+ match result {
+ Ok(_) => {
+ debug!(
+ "Token successfully released for user: {} from instance: {}",
+ user_id, instance_id
+ );
+ Ok(true)
+ }
+ Err(sdk_error) => match sdk_error.into_service_error() {
+ UpdateItemError::ConditionalCheckFailedException(_) => {
+ debug!("Token release failed for user: {} - already released or owned by different instance", user_id);
+ Ok(false)
+ }
+ other => {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Database error releasing token for user {}: {:?}", user_id, other
+ );
+ Err(Error::AwsSdk(other.into()))
+ }
+ },
+ }
+ }
+
+ pub async fn get_total_tokens_count(&self) -> Result<usize, Error> {
+ debug!("Getting total tokens count from DynamoDB");
+
+ let result = self
+ .client
+ .scan()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .select("COUNT".parse().unwrap())
+ .send()
+ .await
+ .map_err(|e| {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "Failed to get total tokens count: {:?}", e
+ );
+ Error::AwsSdk(e.into())
+ })?;
+
+ let count = result.count as usize;
+ debug!("Total tokens count: {}", count);
+ Ok(count)
+ }
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Dec 7, 5:10 AM (10 h, 39 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5842560
Default Alt Text
D15229.1765084229.diff (11 KB)
Attached To
Mode
D15229: [tunnelbroker] implement Tunnelbroker API for farcaster-tokens table
Attached
Detach File
Event Timeline
Log In to Comment