diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -5691,7 +5691,9 @@ dependencies = [ "futures-util", "log", + "native-tls", "tokio", + "tokio-native-tls", "tungstenite 0.18.0", ] @@ -6032,6 +6034,7 @@ "http 0.2.12", "httparse", "log", + "native-tls", "rand 0.8.5", "sha1", "thiserror 1.0.61", @@ -6083,6 +6086,8 @@ "tokio", "tokio-executor-trait", "tokio-reactor-trait", + "tokio-tungstenite 0.18.0", + "tokio-util", "tonic 0.8.3", "tonic-build 0.8.4", "tracing", diff --git a/Cargo.toml b/Cargo.toml --- a/Cargo.toml +++ b/Cargo.toml @@ -4,29 +4,29 @@ # We prefer the wildcard approach because it's easier to exclude # these from Dockerfiles with `sed` members = [ - # All packages in these directories, except explicitly excluded - "services/*", - "shared/*", - # Other packages - "keyserver/addons/rust-node-addon", + # All packages in these directories, except explicitly excluded + "services/*", + "shared/*", + # Other packages + "keyserver/addons/rust-node-addon", ] exclude = [ - # These directories are not Rust services - "services/electron-update-server", - "services/terraform", - "services/scripts", - "services/node_modules", - "shared/protos", - "shared/cmake", - # search-index-lambda has no common dependencies - "services/search-index-lambda", - # These fail to compile while in workspace - "web/backup-client-wasm", - "web/opaque-ke-wasm", - "native/native_rust_library", - "services/.idea", - "shared/.idea", + # These directories are not Rust services + "services/electron-update-server", + "services/terraform", + "services/scripts", + "services/node_modules", + "shared/protos", + "shared/cmake", + # search-index-lambda has no common dependencies + "services/search-index-lambda", + # These fail to compile while in workspace + "web/backup-client-wasm", + "web/opaque-ke-wasm", + "native/native_rust_library", + "services/.idea", + "shared/.idea", ] [workspace.package] @@ -93,6 +93,7 @@ tokio-stream = "0.1.14" tokio-tungstenite = "0.18.0" tokio-tungstenite-wasm = "0.2.1" +tokio-util = "0.7.11" tonic = "0.8.3" tonic-web = "0.9.1" tower-http = "0.4" diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -11,8 +11,8 @@ anyhow = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } comm-lib = { path = "../../shared/comm-lib", features = [ - "aws", - "grpc_clients", + "aws", + "grpc_clients", ] } futures-util = { workspace = true } grpc_clients = { path = "../../shared/grpc_clients" } @@ -21,7 +21,8 @@ once_cell = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } +tokio-tungstenite = { workspace = true, features = ["native-tls"] } tonic = "0.8" tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } @@ -32,12 +33,13 @@ uuid = { workspace = true, features = ["v4"] } jsonwebtoken = "9.3.0" web-push = { version = "0.11.0", features = [ - "hyper-client", + "hyper-client", ], default-features = false } reqwest = { workspace = true, features = ["json", "native-tls", "rustls-tls"] } serde.workspace = true tokio-executor-trait = "2.1" tokio-reactor-trait = "1.1" +tokio-util = { workspace = true } [build-dependencies] tonic-build = "0.8" diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs --- a/services/tunnelbroker/src/config.rs +++ b/services/tunnelbroker/src/config.rs @@ -54,6 +54,30 @@ #[arg(env = "FARCASTER_API_URL")] #[arg(long, default_value = "https://client.farcaster.xyz/")] pub farcaster_api_url: reqwest::Url, + /// Farcaster WebSocket URL + #[arg(env = "FARCASTER_WEBSOCKET_URL")] + #[arg(long, default_value = "wss://ws.farcaster.xyz/stream")] + pub farcaster_websocket_url: String, + /// Token distributor scan interval (seconds) + #[arg(env = "TOKEN_DISTRIBUTOR_SCAN_INTERVAL")] + #[arg(long, default_value_t = 30)] + pub token_distributor_scan_interval: u64, + /// Token distributor heartbeat interval (seconds) + #[arg(env = "TOKEN_DISTRIBUTOR_HEARTBEAT_INTERVAL")] + #[arg(long, default_value_t = 30)] + pub token_distributor_heartbeat_interval: u64, + /// Token distributor heartbeat timeout (seconds) + #[arg(env = "TOKEN_DISTRIBUTOR_HEARTBEAT_TIMEOUT")] + #[arg(long, default_value_t = 120)] + pub token_distributor_heartbeat_timeout: u64, + /// Maximum connections per token distributor instance + #[arg(env = "TOKEN_DISTRIBUTOR_MAX_CONNECTIONS")] + #[arg(long, default_value_t = 100)] + pub token_distributor_max_connections: usize, + /// Ping timeout for WebSocket connections (seconds) + #[arg(env = "TOKEN_DISTRIBUTOR_PING_TIMEOUT")] + #[arg(long, default_value_t = 60)] + pub token_distributor_ping_timeout: u64, } /// Stores configuration parsed from command-line arguments diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -7,10 +7,12 @@ pub mod grpc; pub mod identity; pub mod notifs; +pub mod token_distributor; pub mod websockets; use crate::farcaster::FarcasterClient; use crate::notifs::NotifClient; +use crate::token_distributor::{TokenDistributor, TokenDistributorConfig}; use amqp_client::amqp; use anyhow::{anyhow, Result}; use config::CONFIG; @@ -66,6 +68,10 @@ farcaster_client.clone(), ); + let token_config = TokenDistributorConfig::default(); + let mut token_distributor = + TokenDistributor::new(db_client.clone(), token_config); + tokio::select! { grpc_result = grpc_server => { grpc_result.map_err(|err| anyhow!("gRPC server failed: {:?}", err)) @@ -73,5 +79,8 @@ ws_result = websocket_server => { ws_result.map_err(|err| anyhow!("WS server failed: {:?}", err)) }, + token_result = token_distributor.start() => { + token_result.map_err(|err| anyhow!("Token distributor failed: {:?}", err)) + }, } } diff --git a/services/tunnelbroker/src/token_distributor/config.rs b/services/tunnelbroker/src/token_distributor/config.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/token_distributor/config.rs @@ -0,0 +1,35 @@ +use crate::config::CONFIG; +use std::time::Duration; + +#[derive(Clone)] +pub struct TokenDistributorConfig { + pub instance_id: String, + pub scan_interval: Duration, + pub heartbeat_interval: Duration, + pub heartbeat_timeout: Duration, + pub farcaster_websocket_url: String, + pub max_connections: usize, + pub ping_timeout: Duration, + pub metrics_interval: Duration, +} + +impl Default for TokenDistributorConfig { + fn default() -> Self { + Self { + instance_id: uuid::Uuid::new_v4().to_string(), + scan_interval: Duration::from_secs( + CONFIG.token_distributor_scan_interval, + ), + heartbeat_interval: Duration::from_secs( + CONFIG.token_distributor_heartbeat_interval, + ), + heartbeat_timeout: Duration::from_secs( + CONFIG.token_distributor_heartbeat_timeout, + ), + farcaster_websocket_url: CONFIG.farcaster_websocket_url.clone(), + max_connections: CONFIG.token_distributor_max_connections, + ping_timeout: Duration::from_secs(CONFIG.token_distributor_ping_timeout), + metrics_interval: Duration::from_secs(5 * 60), // 5 minutes + } + } +} diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -0,0 +1,243 @@ +mod config; + +use crate::constants::error_types; +use crate::database::DatabaseClient; +pub(crate) use crate::token_distributor::config::TokenDistributorConfig; +use comm_lib::database::Error; +use futures_util::future; +use std::collections::HashMap; +use tokio::time::interval; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +pub struct TokenDistributor { + db: DatabaseClient, + config: TokenDistributorConfig, + connections: HashMap, +} + +impl TokenDistributor { + pub fn new(db: DatabaseClient, config: TokenDistributorConfig) -> Self { + info!( + "Initializing TokenDistributor - max_connections: {}, \ + scan_interval: {}s, heartbeat_interval: {}s, heartbeat_timeout: {}s,\ + ping_timeout: {}s, metrics_interval: {}s", + config.max_connections, + config.scan_interval.as_secs(), + config.heartbeat_interval.as_secs(), + config.heartbeat_timeout.as_secs(), + config.ping_timeout.as_secs(), + config.metrics_interval.as_secs() + ); + + Self { + db, + config, + connections: HashMap::new(), + } + } + + pub async fn start(&mut self) -> Result<(), Error> { + info!( + "Starting TokenDistributor with instance_id: {}", + self.config.instance_id + ); + + let mut scan_interval = interval(self.config.scan_interval); + let mut metrics_interval = interval(self.config.metrics_interval); + + loop { + tokio::select! { + _ = scan_interval.tick() => { + if let Err(e) = self.scan_and_claim_tokens().await { + error!( + errorType = error_types::DDB_ERROR, + "Failed to scan and claim tokens: {:?}", e + ); + } + } + _ = metrics_interval.tick() => { + self.emit_metrics().await; + } + _ = tokio::signal::ctrl_c() => { + info!("Received killing signal, starting graceful shutdown"); + break; + } + } + } + + self.graceful_shutdown().await?; + + Ok(()) + } + + async fn scan_and_claim_tokens(&mut self) -> Result<(), Error> { + // Clean up dead connections first + self.cleanup_dead_connections(); + + let timeout_threshold = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() + - self.config.heartbeat_timeout.as_secs(); + + // Calculate how many tokens we can still handle + let available_slots = self + .config + .max_connections + .saturating_sub(self.connections.len()); + + if available_slots == 0 { + debug!( + "Already at maximum connections ({}), skipping token scan", + self.config.max_connections + ); + return Ok(()); + } + + debug!( + "Scanning for orphaned tokens with timeout_threshold: {} ({}s ago), \ + available_slots: {}", + timeout_threshold, + self.config.heartbeat_timeout.as_secs(), + available_slots + ); + + let orphaned_tokens = + self.db.scan_orphaned_tokens(timeout_threshold).await?; + + if orphaned_tokens.is_empty() { + debug!("No orphaned tokens found during scan"); + } else { + info!("Found {} orphaned tokens to process", orphaned_tokens.len()); + } + + let mut claimed_count = 0; + for (user_id, token_data) in orphaned_tokens { + if claimed_count >= available_slots { + info!( + "Reached maximum connections limit ({}), stopping token claiming", + self.config.max_connections + ); + break; + } + + if self.connections.contains_key(&user_id) { + debug!("Already managing token for user: {}", user_id); + continue; + } + + // Try to claim the token + match self + .db + .claim_token(&user_id, &self.config.instance_id, timeout_threshold) + .await + { + Ok(true) => { + info!( + "Successfully claimed token for user: {} (claimed {}/{})", + user_id, + claimed_count + 1, + available_slots + ); + + // Create cancellation token for this connection + let cancel_token = CancellationToken::new(); + + // Spawn TokenConnection task + info!("Starting WebSocket connection task for user: {}", user_id); + //TODO + + // Store the cancellation token + self.connections.insert(user_id.clone(), cancel_token); + claimed_count += 1; + info!( + "Active connections: {}/{}", + self.connections.len(), + self.config.max_connections + ); + } + Ok(false) => { + debug!( + "Token for user {} already claimed by another instance", + user_id + ); + } + Err(e) => { + warn!( + "Database error while claiming token for user {}: {:?}", + user_id, e + ); + } + } + } + + Ok(()) + } + + fn cleanup_dead_connections(&mut self) { + let initial_count = self.connections.len(); + self.connections.retain(|user_id, cancel_token| { + if cancel_token.is_cancelled() { + debug!("Removing dead connection for user: {}", user_id); + false + } else { + true + } + }); + + let cleaned_count = initial_count - self.connections.len(); + if cleaned_count > 0 { + debug!("Cleaned up {} dead connections", cleaned_count); + } + } + + async fn emit_metrics(&self) { + //TODO: implement metrics + } + + async fn graceful_shutdown(&mut self) -> Result<(), Error> { + info!("Starting graceful shutdown..."); + + let user_ids: Vec = self.connections.keys().cloned().collect(); + + if !user_ids.is_empty() { + info!("Releasing {} tokens during shutdown", user_ids.len()); + } + + // Cancel all connections gracefully + for cancel_token in self.connections.values() { + cancel_token.cancel(); + } + + let release_futures = user_ids.iter().map(|user_id| { + let db = &self.db; + let instance_id = &self.config.instance_id; + let user_id_clone = user_id.clone(); + async move { + match db.release_token(&user_id_clone, instance_id).await { + Ok(true) => { + debug!("Released token for user: {}", user_id_clone); + } + Ok(false) => { + debug!("Token for user {} already released", user_id_clone); + } + Err(e) => { + warn!( + "Failed to release token for user {}: {:?}", + user_id_clone, e + ); + } + } + } + }); + + future::join_all(release_futures).await; + + // Clear connections + self.connections.clear(); + + info!("Graceful shutdown completed"); + Ok(()) + } +}