diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -49,48 +49,64 @@ }; tokio::spawn(async move { - if let Err(e) = connection.run(cancellation_token).await { - error!("TokenConnection failed for user {}: {:?}", user_id, e); - - // Emit connection failure metric with specific error type - let error_type = match &e { - TokenConnectionError::PingTimeout => "PingTimeout", - TokenConnectionError::WebSocketConnection(_) => "WebSocketConnection", - TokenConnectionError::AuthenticationFailed(_) => { - "AuthenticationFailed" - } - TokenConnectionError::WebSocketClosed(_) => "WebSocketClosed", - TokenConnectionError::StreamEnded => "StreamEnded", - TokenConnectionError::DatabaseError(_) => "DatabaseError", - TokenConnectionError::TokenOwnershipLost => "TokenOwnershipLost", - TokenConnectionError::HeartbeatFailed(_) => "HeartbeatFailed", - TokenConnectionError::Cancelled => "Cancelled", - TokenConnectionError::AmqpSetupFailed(_) => "AmqpSetupFailed", - TokenConnectionError::MessageHandlingFailed(_) => { - "MessageHandlingFailed" - } - }; - - info!( - metricType = "TokenDistributor_ConnectionFailure", - metricValue = 1, - instanceId = config.instance_id, - userId = user_id, - errorType = error_type, - "Connection failure occurred" - ); + let result = connection.run(cancellation_token.clone()).await; - // Clean up token in database on connection failure - if let Err(release_err) = - db.release_token(&user_id, &config.instance_id).await - { - warn!( - "Failed to release token for user {} after connection failure: {:?}", - user_id, - release_err + match result { + Ok(_) => { + info!( + "TokenConnection completed successfully for user: {}", + user_id ); } + Err(e) => { + error!("TokenConnection failed for user {}: {:?}", user_id, e); + + // Emit connection failure metric with specific error type + let error_type = match &e { + TokenConnectionError::PingTimeout => "PingTimeout", + TokenConnectionError::WebSocketConnection(_) => { + "WebSocketConnection" + } + TokenConnectionError::AuthenticationFailed(_) => { + "AuthenticationFailed" + } + TokenConnectionError::WebSocketClosed(_) => "WebSocketClosed", + TokenConnectionError::StreamEnded => "StreamEnded", + TokenConnectionError::DatabaseError(_) => "DatabaseError", + TokenConnectionError::TokenOwnershipLost => "TokenOwnershipLost", + TokenConnectionError::HeartbeatFailed(_) => "HeartbeatFailed", + TokenConnectionError::Cancelled => "Cancelled", + TokenConnectionError::AmqpSetupFailed(_) => "AmqpSetupFailed", + TokenConnectionError::MessageHandlingFailed(_) => { + "MessageHandlingFailed" + } + }; + + info!( + metricType = "TokenDistributor_ConnectionFailure", + metricValue = 1, + instanceId = config.instance_id, + userId = user_id, + errorType = error_type, + "Connection failure occurred" + ); + + // Clean up token in database on connection failure + if let Err(release_err) = + db.release_token(&user_id, &config.instance_id).await + { + warn!( + "Failed to release token for user {} after connection failure: {:?}", + user_id, + release_err + ); + } + } } + + // This ensures cleanup_dead_connections() will remove it from the HashMap + cancellation_token.cancel(); + debug!("Cancelled token for user {} - connection ended", user_id); }); }