diff --git a/lib/types/tunnelbroker/farcaster-messages-types.js b/lib/types/tunnelbroker/farcaster-messages-types.js --- a/lib/types/tunnelbroker/farcaster-messages-types.js +++ b/lib/types/tunnelbroker/farcaster-messages-types.js @@ -5,7 +5,13 @@ import { tShape, tString } from '../../utils/validation-utils.js'; -type APIMethod = { +type: 'GET' } | { +type: 'PUT' } | { +type: 'POST' }; +type APIMethod = + | { + +type: 'GET', + } + | { +type: 'PUT' } + | { +type: 'POST' } + | { +type: 'STREAM' }; export type FarcasterAPIRequest = { +type: 'FarcasterAPIRequest', diff --git a/services/tunnelbroker/src/farcaster/error.rs b/services/tunnelbroker/src/farcaster/error.rs --- a/services/tunnelbroker/src/farcaster/error.rs +++ b/services/tunnelbroker/src/farcaster/error.rs @@ -4,6 +4,7 @@ pub enum Error { ReqwestError(reqwest::Error), InvalidHeaderValue(reqwest::header::InvalidHeaderValue), + AmqpError(lapin::Error), MissingFarcasterToken, InvalidRequest, DatabaseError(comm_lib::database::Error), diff --git a/services/tunnelbroker/src/farcaster/mod.rs b/services/tunnelbroker/src/farcaster/mod.rs --- a/services/tunnelbroker/src/farcaster/mod.rs +++ b/services/tunnelbroker/src/farcaster/mod.rs @@ -1,6 +1,8 @@ +use crate::amqp_client::amqp::AmqpConnection; use crate::constants::FARCASTER_REQUEST_TIMEOUT; use crate::database::DatabaseClient; use crate::farcaster::error::Error::MissingFarcasterToken; +use lapin::{BasicProperties, ExchangeKind}; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; use tracing::{debug, error}; use tunnelbroker_messages::farcaster::{APIMethod, FarcasterAPIRequest}; @@ -12,12 +14,14 @@ farcaster_api_url: reqwest::Url, http_client: reqwest::Client, db_client: DatabaseClient, + amqp_connection: AmqpConnection, } impl FarcasterClient { pub fn new( farcaster_api_url: reqwest::Url, db_client: DatabaseClient, + amqp_connection: &AmqpConnection, ) -> Result { let http_client = reqwest::Client::builder() .timeout(FARCASTER_REQUEST_TIMEOUT) @@ -27,6 +31,7 @@ farcaster_api_url, http_client, db_client, + amqp_connection: amqp_connection.clone(), }) } @@ -84,9 +89,51 @@ .post(url) .headers(headers) .body(request.payload), + APIMethod::STREAM => { + error!("STREAM method should be handled earlier,not in api_request"); + return Err(error::Error::InvalidRequest); + } }; let response = request_builder.send().await?; Ok((response.status(), response.text().await?)) } + + pub async fn handle_stream_request( + &self, + request: FarcasterAPIRequest, + ) -> Result<(), error::Error> { + debug!("Handling STREAM request for user {}", request.user_id); + + let topic_name = format!("farcaster_user_{}", request.user_id); + + let channel = self.amqp_connection.new_channel().await?; + + // Declare exchange + channel + .exchange_declare( + &topic_name, + ExchangeKind::Direct, + lapin::options::ExchangeDeclareOptions::default(), + lapin::types::FieldTable::default(), + ) + .await?; + + // Publish the stream message + channel + .basic_publish( + &topic_name, + "", + lapin::options::BasicPublishOptions::default(), + request.payload.as_bytes(), + BasicProperties::default(), + ) + .await?; + + debug!( + "Successfully published stream message to topic: {}", + topic_name + ); + Ok(()) + } } diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -56,9 +56,12 @@ let notif_client = NotifClient::new(db_client.clone()); let farcaster_api_url = CONFIG.farcaster_api_url.clone(); - let farcaster_client = - FarcasterClient::new(farcaster_api_url, db_client.clone()) - .expect("Unable to create Farcaster client"); + let farcaster_client = FarcasterClient::new( + farcaster_api_url, + db_client.clone(), + &amqp_connection, + ) + .expect("Unable to create Farcaster client"); let grpc_server = grpc::run_server(db_client.clone(), &amqp_connection); let websocket_server = websockets::run_server( @@ -70,7 +73,7 @@ let token_config = TokenDistributorConfig::default(); let mut token_distributor = - TokenDistributor::new(db_client.clone(), token_config); + TokenDistributor::new(db_client.clone(), token_config, &amqp_connection); tokio::select! { grpc_result = grpc_server => { diff --git a/services/tunnelbroker/src/token_distributor/error.rs b/services/tunnelbroker/src/token_distributor/error.rs --- a/services/tunnelbroker/src/token_distributor/error.rs +++ b/services/tunnelbroker/src/token_distributor/error.rs @@ -20,6 +20,8 @@ PingTimeout, #[display(fmt = "Connection cancelled")] Cancelled, + #[display(fmt = "AMQP setup failed: {}", _0)] + AmqpSetupFailed(String), } impl std::error::Error for TokenConnectionError { diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -2,6 +2,7 @@ mod error; mod token_connection; +use crate::amqp_client::amqp::AmqpConnection; use crate::constants::error_types; use crate::database::DatabaseClient; pub(crate) use crate::token_distributor::config::TokenDistributorConfig; @@ -17,10 +18,15 @@ db: DatabaseClient, config: TokenDistributorConfig, connections: HashMap, + amqp_connection: AmqpConnection, } impl TokenDistributor { - pub fn new(db: DatabaseClient, config: TokenDistributorConfig) -> Self { + pub fn new( + db: DatabaseClient, + config: TokenDistributorConfig, + amqp_connection: &AmqpConnection, + ) -> Self { info!( "Initializing TokenDistributor - max_connections: {}, \ scan_interval: {}s, heartbeat_interval: {}s, heartbeat_timeout: {}s,\ @@ -46,6 +52,7 @@ db, config, connections: HashMap::new(), + amqp_connection: amqp_connection.clone(), } } @@ -180,6 +187,7 @@ self.config.clone(), user_id.clone(), token_data, + self.amqp_connection.clone(), cancel_token.clone(), ); diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -1,7 +1,9 @@ +use crate::amqp_client::amqp::AmqpConnection; use crate::database::DatabaseClient; use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; use futures_util::{SinkExt, StreamExt}; +use lapin::{options::*, types::FieldTable, ExchangeKind}; use std::time::Duration; use tokio::time::{interval, Instant}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; @@ -13,6 +15,7 @@ config: TokenDistributorConfig, user_id: String, token_data: String, + amqp_connection: AmqpConnection, } impl TokenConnection { @@ -21,6 +24,7 @@ config: TokenDistributorConfig, user_id: String, token_data: String, + amqp_connection: AmqpConnection, cancellation_token: CancellationToken, ) { let connection = Self { @@ -28,6 +32,7 @@ config: config.clone(), user_id: user_id.clone(), token_data, + amqp_connection, }; tokio::spawn(async move { @@ -47,6 +52,7 @@ TokenConnectionError::TokenOwnershipLost => "TokenOwnershipLost", TokenConnectionError::HeartbeatFailed(_) => "HeartbeatFailed", TokenConnectionError::Cancelled => "Cancelled", + TokenConnectionError::AmqpSetupFailed(_) => "AmqpSetupFailed", }; info!( @@ -182,6 +188,22 @@ self.user_id ); + // Set up AMQP topic listener for farcaster messages + let topic_name = format!("farcaster_user_{}", self.user_id); + let mut amqp_consumer = match self.setup_amqp_consumer(&topic_name).await { + Ok(consumer) => consumer, + Err(e) => { + error!( + "Failed to setup AMQP consumer for user {}: {}", + self.user_id, e + ); + return Err(TokenConnectionError::AmqpSetupFailed(format!( + "Failed to setup AMQP consumer: {}", + e + ))); + } + }; + let mut heartbeat_interval = interval(self.config.heartbeat_interval); let mut last_ping = Instant::now(); // Track last ping time let ping_timeout = tokio::time::sleep(self.config.ping_timeout); @@ -195,6 +217,32 @@ loop { tokio::select! { + // Handle AMQP messages and forward to WebSocket + amqp_msg = amqp_consumer.next() => { + if let Some(delivery_result) = amqp_msg { + match delivery_result { + Ok(delivery) => { + let payload = String::from_utf8_lossy(&delivery.data); + debug!("Received AMQP message for user {}: {}", self.user_id, payload); + + // Forward message to WebSocket + if let Err(e) = write.send(Message::Text(payload.to_string())).await { + error!("Failed to forward AMQP message to WebSocket for user {}: {:?}", self.user_id, e); + } else { + // Acknowledge the AMQP message + if let Err(e) = delivery.ack(BasicAckOptions::default()).await { + error!("Failed to acknowledge AMQP message for user {}: {:?}", self.user_id, e); + } + info!("Message {:?} sent", payload); + } + } + Err(e) => { + error!("AMQP consumer error for user {}: {:?}", self.user_id, e); + } + } + } + } + msg = read.next() => { match msg { Some(Ok(msg)) => match msg { @@ -295,4 +343,62 @@ } } } + + async fn setup_amqp_consumer( + &self, + topic_name: &str, + ) -> Result { + let channel = self.amqp_connection.new_channel().await?; + + // Declare exchange + channel + .exchange_declare( + topic_name, + ExchangeKind::Direct, + ExchangeDeclareOptions::default(), + FieldTable::default(), + ) + .await?; + + // Declare queue with unique name for this connection + let queue_name = format!("{}_{}", topic_name, self.user_id); + let queue = channel + .queue_declare( + &queue_name, + QueueDeclareOptions { + auto_delete: true, + exclusive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await?; + + // Bind queue to exchange + channel + .queue_bind( + queue.name().as_str(), + topic_name, + "", + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + // Create consumer + let consumer = channel + .basic_consume( + queue.name().as_str(), + &format!("consumer_{}_{}", topic_name, self.user_id), + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + debug!( + "AMQP consumer set up for topic: {} user: {}", + topic_name, self.user_id + ); + Ok(consumer) + } } diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -428,6 +428,25 @@ }; } + // Handle STREAM method separately + if matches!( + request.method, + tunnelbroker_messages::farcaster::APIMethod::STREAM + ) { + let response = + match self.farcaster_client.handle_stream_request(request).await { + Ok(()) => FarcasterAPIResponseData::Success( + "Message published to stream".to_string(), + ), + Err(err) => FarcasterAPIResponseData::Error(err.to_string()), + }; + + return FarcasterAPIResponse { + request_id, + response, + }; + } + let response = match self.farcaster_client.api_request(request).await { Ok((status, response)) => { if status.is_success() { diff --git a/shared/tunnelbroker_messages/src/messages/farcaster.rs b/shared/tunnelbroker_messages/src/messages/farcaster.rs --- a/shared/tunnelbroker_messages/src/messages/farcaster.rs +++ b/shared/tunnelbroker_messages/src/messages/farcaster.rs @@ -7,6 +7,7 @@ PUT, GET, POST, + STREAM, } #[derive(Serialize, Deserialize, TagAwareDeserialize, PartialEq, Debug)]