diff --git a/services/terraform/modules/shared/outputs.tf b/services/terraform/modules/shared/outputs.tf --- a/services/terraform/modules/shared/outputs.tf +++ b/services/terraform/modules/shared/outputs.tf @@ -7,6 +7,7 @@ aws_dynamodb_table.tunnelbroker-undelivered-messages, aws_dynamodb_table.identity-users, aws_dynamodb_table.identity-reserved-usernames, + aws_dynamodb_table.farcaster-tokens, ] } diff --git a/services/terraform/remote/token_distributor_metrics.tf b/services/terraform/remote/token_distributor_metrics.tf new file mode 100644 --- /dev/null +++ b/services/terraform/remote/token_distributor_metrics.tf @@ -0,0 +1,48 @@ +locals { + token_distributor_metrics = { + ActiveConnections = { name = "ActiveConnections", pattern = "TokenDistributor_ActiveConnections" }, + TotalTokensCount = { name = "TotalTokensCount", pattern = "TokenDistributor_TotalTokensCount" }, + TokenClaimed = { name = "TokenClaimed", pattern = "TokenDistributor_TokenClaimed" }, + TokenReleased = { name = "TokenReleased", pattern = "TokenDistributor_TokenReleased" }, + TokenClaimFailure = { name = "TokenClaimFailure", pattern = "TokenDistributor_TokenClaimFailure" }, + OrphanedTokensFound = { name = "OrphanedTokensFound", pattern = "TokenDistributor_OrphanedTokensFound" }, + DeadConnectionsCleaned = { name = "DeadConnectionsCleaned", pattern = "TokenDistributor_DeadConnectionsCleaned" }, + ConnectionFailure = { name = "ConnectionFailure", pattern = "TokenDistributor_ConnectionFailure" }, + InstanceStarted = { name = "InstanceStarted", pattern = "TokenDistributor_InstanceStarted" } + } +} + +# CloudWatch Log Metric Filters for TokenDistributor metrics +resource "aws_cloudwatch_log_metric_filter" "token_distributor_metrics" { + for_each = local.token_distributor_metrics + + name = "TokenDistributor${each.value.name}" + pattern = "{ $.fields.metricType = \"${each.value.pattern}\" }" + log_group_name = "/ecs/tunnelbroker-task-def" + + metric_transformation { + name = "TokenDistributor${each.value.name}" + namespace = "TokenDistributor" + value = "$.fields.metricValue" + dimensions = { + InstanceId = "$.fields.instanceId" + } + } +} + +# Special metric filters with additional dimensions +resource "aws_cloudwatch_log_metric_filter" "token_distributor_connection_failures_by_type" { + name = "TokenDistributorConnectionFailureByType" + pattern = "{ $.fields.metricType = \"TokenDistributor_ConnectionFailure\" }" + log_group_name = "/ecs/tunnelbroker-task-def" + + metric_transformation { + name = "TokenDistributorConnectionFailureByType" + namespace = "TokenDistributor" + value = "$.fields.metricValue" + dimensions = { + InstanceId = "$.fields.instanceId" + ErrorType = "$.fields.errorType" + } + } +} diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -33,6 +33,15 @@ config.metrics_interval.as_secs() ); + // Emit initial metrics + info!( + metricType = "TokenDistributor_InstanceStarted", + metricValue = 1, + instanceId = config.instance_id, + maxConnections = config.max_connections, + "TokenDistributor instance started" + ); + Self { db, config, @@ -113,6 +122,14 @@ debug!("No orphaned tokens found during scan"); } else { info!("Found {} orphaned tokens to process", orphaned_tokens.len()); + + // Emit orphaned tokens metric + info!( + metricType = "TokenDistributor_OrphanedTokensFound", + metricValue = orphaned_tokens.len(), + instanceId = self.config.instance_id, + "Orphaned tokens discovered during scan" + ); } let mut claimed_count = 0; @@ -144,6 +161,15 @@ available_slots ); + // Emit token claimed metric + info!( + metricType = "TokenDistributor_TokenClaimed", + metricValue = 1, + instanceId = self.config.instance_id, + userId = user_id, + "Token successfully claimed" + ); + // Create cancellation token for this connection let cancel_token = CancellationToken::new(); @@ -177,6 +203,15 @@ "Database error while claiming token for user {}: {:?}", user_id, e ); + + // Emit token claim failure metric + info!( + metricType = "TokenDistributor_TokenClaimFailure", + metricValue = 1, + instanceId = self.config.instance_id, + userId = user_id, + "Token claim failed due to database error" + ); } } } @@ -198,13 +233,44 @@ let cleaned_count = initial_count - self.connections.len(); if cleaned_count > 0 { debug!("Cleaned up {} dead connections", cleaned_count); + + // Emit dead connections cleaned metric + info!( + metricType = "TokenDistributor_DeadConnectionsCleaned", + metricValue = cleaned_count, + instanceId = self.config.instance_id, + "Dead connections cleaned up" + ); } } async fn emit_metrics(&self) { - //TODO: implement metrics - } + // Emit current active connections metric + info!( + metricType = "TokenDistributor_ActiveConnections", + metricValue = self.connections.len(), + instanceId = self.config.instance_id, + "Current active connections count" + ); + // Emit total tokens count metric + match self.db.get_total_tokens_count().await { + Ok(total_tokens) => { + info!( + metricType = "TokenDistributor_TotalTokensCount", + metricValue = total_tokens, + instanceId = self.config.instance_id, + "Total tokens count in database" + ); + } + Err(e) => { + error!( + errorType = error_types::DDB_ERROR, + "Failed to get total tokens count: {:?}", e + ); + } + } + } async fn graceful_shutdown(&mut self) -> Result<(), Error> { info!("Starting graceful shutdown..."); @@ -227,6 +293,15 @@ match db.release_token(&user_id_clone, instance_id).await { Ok(true) => { debug!("Released token for user: {}", user_id_clone); + + // Emit token released metric + info!( + metricType = "TokenDistributor_TokenReleased", + metricValue = 1, + instanceId = instance_id, + userId = user_id_clone, + "Token successfully released during shutdown" + ); } Ok(false) => { debug!("Token for user {} already released", user_id_clone); diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -34,11 +34,39 @@ if let Err(e) = connection.run(cancellation_token).await { error!("TokenConnection failed for user {}: {:?}", user_id, e); + // Emit connection failure metric with specific error type + let error_type = match &e { + TokenConnectionError::PingTimeout => "PingTimeout", + TokenConnectionError::WebSocketConnection(_) => "WebSocketConnection", + TokenConnectionError::AuthenticationFailed(_) => { + "AuthenticationFailed" + } + TokenConnectionError::WebSocketClosed(_) => "WebSocketClosed", + TokenConnectionError::StreamEnded => "StreamEnded", + TokenConnectionError::DatabaseError(_) => "DatabaseError", + TokenConnectionError::TokenOwnershipLost => "TokenOwnershipLost", + TokenConnectionError::HeartbeatFailed(_) => "HeartbeatFailed", + TokenConnectionError::Cancelled => "Cancelled", + }; + + info!( + metricType = "TokenDistributor_ConnectionFailure", + metricValue = 1, + instanceId = config.instance_id, + userId = user_id, + errorType = error_type, + "Connection failure occurred" + ); + // Clean up token in database on connection failure if let Err(release_err) = db.release_token(&user_id, &config.instance_id).await { - warn!("Failed to release token for user {} after connection failure: {:?}", user_id, release_err); + warn!( + "Failed to release token for user {} after connection failure: {:?}", + user_id, + release_err + ); } } });