diff --git a/lib/facts/staff.js b/lib/facts/staff.js --- a/lib/facts/staff.js +++ b/lib/facts/staff.js @@ -1,5 +1,7 @@ // @flow +// NOTE: keep this array in sync with +// `shared/comm-lib/src/sensitive_data.rs` const staff: $ReadOnlyArray = [ '256', '518252', diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs --- a/services/identity/src/constants.rs +++ b/services/identity/src/constants.rs @@ -312,8 +312,9 @@ // Tracing -pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; -pub const REDACT_SENSITIVE_DATA: &str = "REDACT_SENSITIVE_DATA"; +pub use comm_lib::constants::env_var::{ + COMM_SERVICES_USE_JSON_LOGS, REDACT_SENSITIVE_DATA, +}; // Regex @@ -333,7 +334,4 @@ pub const MAX_ONE_TIME_KEYS: usize = 100; // as defined in olm // Comm staff -pub mod staff { - pub const STAFF_USER_IDS: [&str; 1] = ["256"]; - pub const AUTHORITATIVE_KEYSERVER_OWNER_USER_ID: &str = "256"; -} +pub use comm_lib::constants::staff; diff --git a/services/identity/src/log.rs b/services/identity/src/log.rs --- a/services/identity/src/log.rs +++ b/services/identity/src/log.rs @@ -1,18 +1,7 @@ +use comm_lib::sensitive_data::base_redact_sensitive_data; + use crate::config::CONFIG; pub fn redact_sensitive_data(sensitive_data: &str) -> &str { - const ALLOWED_VALUES: [&str; 2] = [ - "ashoat", - crate::constants::staff::AUTHORITATIVE_KEYSERVER_OWNER_USER_ID, - ]; - - if ALLOWED_VALUES.contains(&sensitive_data) { - return sensitive_data; - } - - if CONFIG.redact_sensitive_data { - "REDACTED" - } else { - sensitive_data - } + base_redact_sensitive_data(sensitive_data, CONFIG.redact_sensitive_data) } diff --git a/services/terraform/remote/service_tunnelbroker.tf b/services/terraform/remote/service_tunnelbroker.tf --- a/services/terraform/remote/service_tunnelbroker.tf +++ b/services/terraform/remote/service_tunnelbroker.tf @@ -111,6 +111,10 @@ { name = "COMM_SERVICES_USE_JSON_LOGS", value = local.comm_services_use_json_logs + }, + { + name = "REDACT_SENSITIVE_DATA", + value = local.is_staging ? "false" : "true" } ] secrets = [ diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs --- a/services/tunnelbroker/src/config.rs +++ b/services/tunnelbroker/src/config.rs @@ -78,6 +78,10 @@ #[arg(env = "TOKEN_DISTRIBUTOR_PING_TIMEOUT")] #[arg(long, default_value_t = 60)] pub token_distributor_ping_timeout: u64, + /// Redact sensitive data + #[arg(long, global = true, default_value_t = false)] + #[arg(env = comm_lib::constants::env_var::REDACT_SENSITIVE_DATA)] + pub redact_sensitive_data: bool, } /// Stores configuration parsed from command-line arguments diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -16,7 +16,8 @@ pub const ENV_FCM_CONFIG: &str = "FCM_CONFIG"; pub const ENV_WEB_PUSH_CONFIG: &str = "WEB_PUSH_CONFIG"; pub const ENV_WNS_CONFIG: &str = "WNS_CONFIG"; -pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; +pub const COMM_SERVICES_USE_JSON_LOGS: &str = + comm_lib::constants::env_var::COMM_SERVICES_USE_JSON_LOGS; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; diff --git a/services/tunnelbroker/src/log.rs b/services/tunnelbroker/src/log.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/log.rs @@ -0,0 +1,7 @@ +use comm_lib::sensitive_data::base_redact_sensitive_data; + +use crate::config::CONFIG; + +pub fn redact_sensitive_data(sensitive_data: &str) -> &str { + base_redact_sensitive_data(sensitive_data, CONFIG.redact_sensitive_data) +} diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -6,6 +6,7 @@ pub mod farcaster; pub mod grpc; pub mod identity; +pub mod log; pub mod notifs; pub mod token_distributor; pub mod websockets; diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -2,11 +2,11 @@ mod error; mod token_connection; -use crate::amqp_client::amqp::AmqpConnection; use crate::constants::error_types; use crate::database::DatabaseClient; pub(crate) use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::token_connection::TokenConnection; +use crate::{amqp_client::amqp::AmqpConnection, log::redact_sensitive_data}; use comm_lib::database::Error; use futures_util::future; use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; @@ -167,7 +167,7 @@ Ok(true) => { info!( "Successfully claimed token for user: {} (claimed {}/{})", - user_id, + redact_sensitive_data(&user_id), claimed_count + 1, available_slots ); @@ -177,7 +177,7 @@ metricType = "TokenDistributor_TokenClaimed", metricValue = 1, instanceId = self.config.instance_id, - userId = user_id, + userId = redact_sensitive_data(&user_id), "Token successfully claimed" ); @@ -185,7 +185,10 @@ let cancel_token = CancellationToken::new(); // Spawn TokenConnection task - info!("Starting WebSocket connection task for user: {}", user_id); + info!( + "Starting WebSocket connection task for user: {}", + redact_sensitive_data(&user_id) + ); TokenConnection::start( self.db.clone(), self.config.clone(), @@ -214,7 +217,8 @@ Err(e) => { warn!( "Database error while claiming token for user {}: {:?}", - user_id, e + redact_sensitive_data(&user_id), + e ); // Emit token claim failure metric @@ -222,7 +226,7 @@ metricType = "TokenDistributor_TokenClaimFailure", metricValue = 1, instanceId = self.config.instance_id, - userId = user_id, + userId = redact_sensitive_data(&user_id), "Token claim failed due to database error" ); } @@ -312,7 +316,7 @@ metricType = "TokenDistributor_TokenReleased", metricValue = 1, instanceId = instance_id, - userId = user_id_clone, + userId = redact_sensitive_data(&user_id_clone), "Token successfully released during shutdown" ); } @@ -322,7 +326,8 @@ Err(e) => { warn!( "Failed to release token for user {}: {:?}", - user_id_clone, e + redact_sensitive_data(&user_id_clone), + e ); } } diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -2,6 +2,7 @@ send_message_to_device, AmqpChannel, AmqpConnection, }; use crate::database::DatabaseClient; +use crate::log::redact_sensitive_data; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; use futures_util::{SinkExt, StreamExt}; @@ -55,11 +56,15 @@ Ok(_) => { info!( "TokenConnection completed successfully for user: {}", - user_id + redact_sensitive_data(&user_id) ); } Err(e) => { - error!("TokenConnection failed for user {}: {:?}", user_id, e); + error!( + "TokenConnection failed for user {}: {:?}", + redact_sensitive_data(&user_id), + e + ); // Emit connection failure metric with specific error type let error_type = match &e { @@ -86,7 +91,7 @@ metricType = "TokenDistributor_ConnectionFailure", metricValue = 1, instanceId = config.instance_id, - userId = user_id, + userId = redact_sensitive_data(&user_id), errorType = error_type, "Connection failure occurred" ); @@ -97,7 +102,7 @@ { warn!( "Failed to release token for user {} after connection failure: {:?}", - user_id, + redact_sensitive_data(&user_id), release_err ); } @@ -114,20 +119,23 @@ self, cancellation_token: CancellationToken, ) -> Result<(), TokenConnectionError> { - info!("Starting connection for user: {}", self.user_id); + info!( + "Starting connection for user: {}", + redact_sensitive_data(&self.user_id) + ); loop { tokio::select! { result = self.connect_and_maintain(&self.token_data, &cancellation_token) => { match result { Ok(_) => { - info!("Connection completed normally for user: {}", self.user_id); + info!("Connection completed normally for user: {}", redact_sensitive_data(&self.user_id)); break; } Err(e) => { warn!( "Socket connection failed for user {}, reason: {}", - self.user_id, + redact_sensitive_data(&self.user_id), e ); @@ -151,14 +159,14 @@ Ok(false) => { warn!( "Lost token ownership for user {}, stopping reconnection attempts", - self.user_id + redact_sensitive_data(&self.user_id) ); return Err(TokenConnectionError::TokenOwnershipLost); } Err(err) => { error!( "Failed to verify token ownership for user {}: {:?}, retrying in 5 seconds", - self.user_id, + redact_sensitive_data(&self.user_id), err ); tokio::time::sleep(Duration::from_secs(5)).await; @@ -168,13 +176,16 @@ } } _ = cancellation_token.cancelled() => { - info!("Connection cancelled for user: {}", self.user_id); + info!("Connection cancelled for user: {}", redact_sensitive_data(&self.user_id)); return Err(TokenConnectionError::Cancelled); } } } - info!("TokenConnection ended for user: {}", self.user_id); + info!( + "TokenConnection ended for user: {}", + redact_sensitive_data(&self.user_id) + ); Ok(()) } @@ -206,7 +217,7 @@ if let Err(e) = write.send(Message::Text(auth_msg.to_string())).await { error!( "Failed to send auth message for user {}: {:?}, connection will be retried", - self.user_id, + redact_sensitive_data(&self.user_id), e ); return Err(TokenConnectionError::AuthenticationFailed(format!( @@ -217,7 +228,7 @@ info!( "WebSocket connected and authenticated successfully for user: {}", - self.user_id + redact_sensitive_data(&self.user_id) ); // Set up AMQP topic listener for farcaster messages @@ -227,7 +238,8 @@ Err(e) => { error!( "Failed to setup AMQP consumer for user {}: {}", - self.user_id, e + redact_sensitive_data(&self.user_id), + e ); return Err(TokenConnectionError::AmqpSetupFailed(format!( "Failed to setup AMQP consumer: {}", @@ -261,17 +273,17 @@ // Forward message to WebSocket if let Err(e) = write.send(Message::Text(payload.to_string())).await { - error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", self.user_id, e); + error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", redact_sensitive_data(&self.user_id), e); } else { // Acknowledge the AMQP message if let Err(e) = delivery.ack(BasicAckOptions::default()).await { - error!("Failed to acknowledge AMQP message for user {}: {:?}", self.user_id, e); + error!("Failed to acknowledge AMQP message for user {}: {:?}", redact_sensitive_data(&self.user_id), e); } info!("Message {:?} sent", payload); } } Err(e) => { - error!("AMQP consumer error for user {}: {:?}", self.user_id, e); + error!("AMQP consumer error for user {}: {:?}", redact_sensitive_data(&self.user_id), e); } } } @@ -294,7 +306,7 @@ metricType = "TokenDistributor_ConnectionFailure", metricValue = 1, instanceId = self.config.instance_id, - userId = self.user_id, + userId = redact_sensitive_data(&self.user_id), errorType = "MessageHandlingFailed", "Failed to handle refresh direct cast conversation: {:?}", e @@ -334,10 +346,10 @@ Message::Close(close_frame) => { let reason = if let Some(frame) = close_frame { let msg = format!("code: {}, reason: {}", frame.code, frame.reason); - error!("WebSocket closed for user {} - {}", self.user_id, msg); + error!("WebSocket closed for user {} - {}", redact_sensitive_data(&self.user_id), msg); msg } else { - error!("WebSocket closed for user {} without close frame", self.user_id); + error!("WebSocket closed for user {} without close frame", redact_sensitive_data(&self.user_id)); "no close frame provided".to_string() }; return Err(TokenConnectionError::WebSocketClosed(reason)); @@ -346,7 +358,7 @@ Some(Err(e)) => { warn!( "WebSocket protocol error for user {}: {:?}, connection will be restarted", - self.user_id, + redact_sensitive_data(&self.user_id), e ); return Err(TokenConnectionError::WebSocketConnection(e)); @@ -354,7 +366,7 @@ None => { info!( "WebSocket stream ended unexpectedly for user: {}, connection will be restarted", - self.user_id + redact_sensitive_data(&self.user_id) ); return Err(TokenConnectionError::StreamEnded); } @@ -370,14 +382,14 @@ Ok(false) => { warn!( "Lost token ownership for user: {} - another instance may have claimed it", - self.user_id + redact_sensitive_data(&self.user_id) ); return Err(TokenConnectionError::TokenOwnershipLost); } Err(e) => { error!( "Failed to update heartbeat for user {}: {:?}", - self.user_id, + redact_sensitive_data(&self.user_id), e ); return Err(TokenConnectionError::HeartbeatFailed(format!("Database error: {}", e))); @@ -390,14 +402,14 @@ let elapsed = last_ping.elapsed(); error!( "Ping timeout for user: {} - no ping received for {}s, connection dead", - self.user_id, elapsed.as_secs() + redact_sensitive_data(&self.user_id), elapsed.as_secs() ); return Err(TokenConnectionError::PingTimeout); } // Handle cancellation _ = cancellation_token.cancelled() => { - info!("Connection cancelled for user: {}, closing WebSocket", self.user_id); + info!("Connection cancelled for user: {}, closing WebSocket", redact_sensitive_data(&self.user_id)); // Send close frame before terminating let _ = write.send(Message::Close(None)).await; diff --git a/shared/comm-lib/src/constants.rs b/shared/comm-lib/src/constants.rs --- a/shared/comm-lib/src/constants.rs +++ b/shared/comm-lib/src/constants.rs @@ -22,3 +22,16 @@ /// token in service requests. pub const DISABLE_CSAT_VERIFICATION_ENV_VAR: &str = "COMM_SERVICES_DISABLE_CSAT_VERIFICATION"; + +pub mod env_var { + // Tracing + + pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; + pub const REDACT_SENSITIVE_DATA: &str = "REDACT_SENSITIVE_DATA"; +} + +// Comm staff +pub mod staff { + pub const STAFF_USER_IDS: [&str; 1] = ["256"]; + pub const AUTHORITATIVE_KEYSERVER_OWNER_USER_ID: &str = "256"; +} diff --git a/shared/comm-lib/src/lib.rs b/shared/comm-lib/src/lib.rs --- a/shared/comm-lib/src/lib.rs +++ b/shared/comm-lib/src/lib.rs @@ -8,6 +8,7 @@ pub mod database; #[cfg(feature = "http")] pub mod http; +pub mod sensitive_data; pub mod shared; pub mod tools; diff --git a/shared/comm-lib/src/sensitive_data.rs b/shared/comm-lib/src/sensitive_data.rs new file mode 100644 --- /dev/null +++ b/shared/comm-lib/src/sensitive_data.rs @@ -0,0 +1,35 @@ +const ALLOWED_VALUES: [&str; 17] = [ + "ashoat", + // NOTE: staff IDs below. Keep these in sync with `lib/facts/staff.js` + "256", + "518252", + "379341", + "1509097", + "1329299", + "1589929", + "1629414", + "2231519", + "2775177", + "2815499", + "3033752", + "3079802", + "3197947", + "6142155", + "6646635", + "9AAFD445-F64D-4557-A3F5-79387E95E9BA", +]; + +pub fn base_redact_sensitive_data( + sensitive_data: &str, + should_redact: bool, +) -> &str { + if ALLOWED_VALUES.contains(&sensitive_data) { + return sensitive_data; + } + + if should_redact { + "REDACTED" + } else { + sensitive_data + } +}