diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs --- a/services/tunnelbroker/src/config.rs +++ b/services/tunnelbroker/src/config.rs @@ -50,6 +50,10 @@ #[arg(env = ENV_WNS_CONFIG)] #[arg(long)] pub wns_config: Option, + /// Farcaster API + #[arg(env = "FARCASTER_API_URL")] + #[arg(long, default_value = "https://client.farcaster.xyz/")] + pub farcaster_api_url: reqwest::Url, } /// Stores configuration parsed from command-line arguments diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -24,6 +24,8 @@ pub const PUSH_SERVICE_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); +pub const FARCASTER_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); + pub mod dynamodb { // This table holds messages which could not be immediately delivered to // a device. diff --git a/services/tunnelbroker/src/farcaster/error.rs b/services/tunnelbroker/src/farcaster/error.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/farcaster/error.rs @@ -0,0 +1,10 @@ +use derive_more::{Display, Error, From}; + +#[derive(Debug, From, Display, Error)] +pub enum Error { + ReqwestError(reqwest::Error), + InvalidHeaderValue(reqwest::header::InvalidHeaderValue), + MissingFarcasterToken, + InvalidRequest, + DatabaseError(comm_lib::database::Error), +} diff --git a/services/tunnelbroker/src/farcaster/mod.rs b/services/tunnelbroker/src/farcaster/mod.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/farcaster/mod.rs @@ -0,0 +1,107 @@ +use crate::constants::FARCASTER_REQUEST_TIMEOUT; +use crate::database::DatabaseClient; +use crate::farcaster::error::Error::MissingFarcasterToken; +use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; +use tracing::{debug, error}; +pub mod error; + +#[derive(Debug)] +pub enum APIMethod { + PUT, + GET, + STREAM, +} + +pub struct FarcasterAPIRequest { + pub request_id: String, + pub user_id: String, + /// API version, examples: "v2", "fc" + pub api_version: String, + pub endpoint: String, + pub method: APIMethod, + /// query, body, or stream message + pub payload: String, +} + +#[derive(Clone)] +pub struct FarcasterClient { + farcaster_api_url: reqwest::Url, + http_client: reqwest::Client, + db_client: DatabaseClient, +} + +impl FarcasterClient { + pub fn new( + farcaster_api_url: reqwest::Url, + db_client: DatabaseClient, + ) -> Result { + let http_client = reqwest::Client::builder() + .timeout(FARCASTER_REQUEST_TIMEOUT) + .build()?; + + Ok(FarcasterClient { + farcaster_api_url, + http_client, + db_client, + }) + } + + pub async fn api_request( + &self, + request: FarcasterAPIRequest, + ) -> Result<(reqwest::StatusCode, String), error::Error> { + debug!( + "Received Farcaster {:?} {} {} request from {}", + request.method, request.api_version, request.endpoint, request.user_id + ); + + let farcaster_dc_token_response = self + .db_client + .get_farcaster_token(&request.user_id) + .await + .map_err(error::Error::DatabaseError)?; + + let farcaster_dc_token = match farcaster_dc_token_response { + Some(token) => token, + None => return Err(MissingFarcasterToken), + }; + + let mut headers = HeaderMap::new(); + let bearer = format!("Bearer {}", farcaster_dc_token); + let mut bearer_header = HeaderValue::from_str(&bearer)?; + bearer_header.set_sensitive(true); + headers.insert(AUTHORIZATION, bearer_header); + headers.insert( + reqwest::header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + + let mut url = self.farcaster_api_url.clone(); + + // Append path segments + url + .path_segments_mut() + .expect("base URL cannot be base") + .extend(&[&request.api_version, &request.endpoint]); + + let request_builder = match request.method { + APIMethod::PUT => self + .http_client + .put(url) + .headers(headers) + .body(request.payload), + APIMethod::GET => { + // Directly append entire string of params + url.set_query(Some(&request.payload)); + self.http_client.get(url).headers(headers) + } + APIMethod::STREAM => { + error!("Received STREAM API for {}", request.endpoint); + return Err(error::Error::InvalidRequest); + } + }; + + let response = request_builder.send().await?; + Ok((response.status(), response.text().await?)) + } +} diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -3,11 +3,13 @@ pub mod constants; pub mod database; pub mod error; +pub mod farcaster; pub mod grpc; pub mod identity; pub mod notifs; pub mod websockets; +use crate::farcaster::FarcasterClient; use crate::notifs::NotifClient; use amqp_client::amqp; use anyhow::{anyhow, Result}; @@ -51,11 +53,17 @@ let notif_client = NotifClient::new(db_client.clone()); + let farcaster_api_url = CONFIG.farcaster_api_url.clone(); + let farcaster_client = + FarcasterClient::new(farcaster_api_url, db_client.clone()) + .expect("Unable to create Farcaster client"); + let grpc_server = grpc::run_server(db_client.clone(), &amqp_connection); let websocket_server = websockets::run_server( db_client.clone(), &amqp_connection, notif_client.clone(), + farcaster_client.clone(), ); tokio::select! { diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,10 +1,11 @@ pub mod session; -use crate::amqp_client::amqp::AmqpConnection; +use crate::amqp::AmqpConnection; use crate::constants::{SOCKET_HEARTBEAT_TIMEOUT, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::DatabaseClient; use crate::notifs::NotifClient; use crate::websockets::session::SessionError; +use crate::FarcasterClient; use crate::CONFIG; use futures_util::stream::SplitSink; use futures_util::{SinkExt, StreamExt}; @@ -43,6 +44,7 @@ amqp: AmqpConnection, db_client: DatabaseClient, notif_client: NotifClient, + farcaster_client: FarcasterClient, } impl hyper::service::Service> for WebsocketService { @@ -65,6 +67,7 @@ let db_client = self.db_client.clone(); let amqp = self.amqp.clone(); let notif_client = self.notif_client.clone(); + let farcaster_client = self.farcaster_client.clone(); let future = async move { // Check if the request is a websocket upgrade request. @@ -73,8 +76,15 @@ // Spawn a task to handle the websocket connection. tokio::spawn(async move { - accept_connection(websocket, addr, db_client, amqp, notif_client) - .await; + accept_connection( + websocket, + addr, + db_client, + amqp, + notif_client, + farcaster_client, + ) + .await; }); // Return the response so the spawned future can continue. @@ -104,6 +114,7 @@ db_client: DatabaseClient, amqp_connection: &AmqpConnection, notif_client: NotifClient, + farcaster_client: FarcasterClient, ) -> Result<(), BoxedError> { let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") .unwrap_or_else(|_| format!("0.0.0.0:{}", &CONFIG.http_port)); @@ -125,6 +136,7 @@ db_client: db_client.clone(), addr, notif_client: notif_client.clone(), + farcaster_client: farcaster_client.clone(), }, ) .with_upgrades(); @@ -169,6 +181,7 @@ db_client: DatabaseClient, amqp_connection: AmqpConnection, notif_client: NotifClient, + farcaster_client: FarcasterClient, ) { debug!("Incoming connection from: {}", addr); @@ -194,6 +207,7 @@ db_client, amqp_connection, notif_client, + farcaster_client, ) .await { @@ -322,12 +336,20 @@ db_client: DatabaseClient, amqp: AmqpConnection, notif_client: NotifClient, + farcaster_client: FarcasterClient, ) -> Result, ErrorWithStreamHandle> { let device_info = match get_device_info_from_frame(frame).await { Ok(info) => info, Err(e) => return Err((e, outgoing)), }; - WebsocketSession::new(outgoing, db_client, device_info, amqp, notif_client) - .await + WebsocketSession::new( + outgoing, + db_client, + device_info, + amqp, + notif_client, + farcaster_client, + ) + .await } diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -21,6 +21,7 @@ use crate::amqp_client::AmqpClient; use crate::database::{self, DatabaseClient}; +use crate::farcaster::FarcasterClient; use crate::identity; use crate::notifs::NotifClient; @@ -41,6 +42,7 @@ // Each websocket has an AMQP connection associated with a particular device amqp_client: AmqpClient, notif_client: NotifClient, + farcaster_client: FarcasterClient, } #[derive( @@ -162,6 +164,7 @@ device_info: DeviceInfo, amqp: AmqpConnection, notif_client: NotifClient, + farcaster_client: FarcasterClient, ) -> Result> { let amqp_client = match AmqpClient::new(db_client.clone(), device_info.clone(), amqp).await @@ -176,6 +179,7 @@ device_info, amqp_client, notif_client, + farcaster_client, }) }