Page MenuHomePhorge

D15182.1765103283.diff
No OneTemporary

Size
9 KB
Referenced Files
None
Subscribers
None

D15182.1765103283.diff

diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs
--- a/services/tunnelbroker/src/config.rs
+++ b/services/tunnelbroker/src/config.rs
@@ -50,6 +50,10 @@
#[arg(env = ENV_WNS_CONFIG)]
#[arg(long)]
pub wns_config: Option<WNSConfig>,
+ /// Farcaster API
+ #[arg(env = "FARCASTER_API_URL")]
+ #[arg(long, default_value = "https://client.farcaster.xyz/")]
+ pub farcaster_api_url: reqwest::Url,
}
/// Stores configuration parsed from command-line arguments
diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -24,6 +24,8 @@
pub const PUSH_SERVICE_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);
+pub const FARCASTER_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);
+
pub mod dynamodb {
// This table holds messages which could not be immediately delivered to
// a device.
diff --git a/services/tunnelbroker/src/farcaster/error.rs b/services/tunnelbroker/src/farcaster/error.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/farcaster/error.rs
@@ -0,0 +1,10 @@
+use derive_more::{Display, Error, From};
+
+#[derive(Debug, From, Display, Error)]
+pub enum Error {
+ ReqwestError(reqwest::Error),
+ InvalidHeaderValue(reqwest::header::InvalidHeaderValue),
+ MissingFarcasterToken,
+ InvalidRequest,
+ DatabaseError(comm_lib::database::Error),
+}
diff --git a/services/tunnelbroker/src/farcaster/mod.rs b/services/tunnelbroker/src/farcaster/mod.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/farcaster/mod.rs
@@ -0,0 +1,107 @@
+use crate::constants::FARCASTER_REQUEST_TIMEOUT;
+use crate::database::DatabaseClient;
+use crate::farcaster::error::Error::MissingFarcasterToken;
+use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
+use tracing::{debug, error};
+pub mod error;
+
+#[derive(Debug)]
+pub enum APIMethod {
+ PUT,
+ GET,
+ STREAM,
+}
+
+pub struct FarcasterAPIRequest {
+ pub request_id: String,
+ pub user_id: String,
+ /// API version, examples: "v2", "fc"
+ pub api_version: String,
+ pub endpoint: String,
+ pub method: APIMethod,
+ /// query, body, or stream message
+ pub payload: String,
+}
+
+#[derive(Clone)]
+pub struct FarcasterClient {
+ farcaster_api_url: reqwest::Url,
+ http_client: reqwest::Client,
+ db_client: DatabaseClient,
+}
+
+impl FarcasterClient {
+ pub fn new(
+ farcaster_api_url: reqwest::Url,
+ db_client: DatabaseClient,
+ ) -> Result<Self, error::Error> {
+ let http_client = reqwest::Client::builder()
+ .timeout(FARCASTER_REQUEST_TIMEOUT)
+ .build()?;
+
+ Ok(FarcasterClient {
+ farcaster_api_url,
+ http_client,
+ db_client,
+ })
+ }
+
+ pub async fn api_request(
+ &self,
+ request: FarcasterAPIRequest,
+ ) -> Result<(reqwest::StatusCode, String), error::Error> {
+ debug!(
+ "Received Farcaster {:?} {} {} request from {}",
+ request.method, request.api_version, request.endpoint, request.user_id
+ );
+
+ let farcaster_dc_token_response = self
+ .db_client
+ .get_farcaster_token(&request.user_id)
+ .await
+ .map_err(error::Error::DatabaseError)?;
+
+ let farcaster_dc_token = match farcaster_dc_token_response {
+ Some(token) => token,
+ None => return Err(MissingFarcasterToken),
+ };
+
+ let mut headers = HeaderMap::new();
+ let bearer = format!("Bearer {}", farcaster_dc_token);
+ let mut bearer_header = HeaderValue::from_str(&bearer)?;
+ bearer_header.set_sensitive(true);
+ headers.insert(AUTHORIZATION, bearer_header);
+ headers.insert(
+ reqwest::header::CONTENT_TYPE,
+ HeaderValue::from_static("application/json"),
+ );
+
+ let mut url = self.farcaster_api_url.clone();
+
+ // Append path segments
+ url
+ .path_segments_mut()
+ .expect("base URL cannot be base")
+ .extend(&[&request.api_version, &request.endpoint]);
+
+ let request_builder = match request.method {
+ APIMethod::PUT => self
+ .http_client
+ .put(url)
+ .headers(headers)
+ .body(request.payload),
+ APIMethod::GET => {
+ // Directly append entire string of params
+ url.set_query(Some(&request.payload));
+ self.http_client.get(url).headers(headers)
+ }
+ APIMethod::STREAM => {
+ error!("Received STREAM API for {}", request.endpoint);
+ return Err(error::Error::InvalidRequest);
+ }
+ };
+
+ let response = request_builder.send().await?;
+ Ok((response.status(), response.text().await?))
+ }
+}
diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs
--- a/services/tunnelbroker/src/main.rs
+++ b/services/tunnelbroker/src/main.rs
@@ -3,11 +3,13 @@
pub mod constants;
pub mod database;
pub mod error;
+pub mod farcaster;
pub mod grpc;
pub mod identity;
pub mod notifs;
pub mod websockets;
+use crate::farcaster::FarcasterClient;
use crate::notifs::NotifClient;
use amqp_client::amqp;
use anyhow::{anyhow, Result};
@@ -51,11 +53,17 @@
let notif_client = NotifClient::new(db_client.clone());
+ let farcaster_api_url = CONFIG.farcaster_api_url.clone();
+ let farcaster_client =
+ FarcasterClient::new(farcaster_api_url, db_client.clone())
+ .expect("Unable to create Farcaster client");
+
let grpc_server = grpc::run_server(db_client.clone(), &amqp_connection);
let websocket_server = websockets::run_server(
db_client.clone(),
&amqp_connection,
notif_client.clone(),
+ farcaster_client.clone(),
);
tokio::select! {
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -1,10 +1,11 @@
pub mod session;
-use crate::amqp_client::amqp::AmqpConnection;
+use crate::amqp::AmqpConnection;
use crate::constants::{SOCKET_HEARTBEAT_TIMEOUT, WS_SESSION_CLOSE_AMQP_MSG};
use crate::database::DatabaseClient;
use crate::notifs::NotifClient;
use crate::websockets::session::SessionError;
+use crate::FarcasterClient;
use crate::CONFIG;
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
@@ -43,6 +44,7 @@
amqp: AmqpConnection,
db_client: DatabaseClient,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
}
impl hyper::service::Service<Request<Body>> for WebsocketService {
@@ -65,6 +67,7 @@
let db_client = self.db_client.clone();
let amqp = self.amqp.clone();
let notif_client = self.notif_client.clone();
+ let farcaster_client = self.farcaster_client.clone();
let future = async move {
// Check if the request is a websocket upgrade request.
@@ -73,8 +76,15 @@
// Spawn a task to handle the websocket connection.
tokio::spawn(async move {
- accept_connection(websocket, addr, db_client, amqp, notif_client)
- .await;
+ accept_connection(
+ websocket,
+ addr,
+ db_client,
+ amqp,
+ notif_client,
+ farcaster_client,
+ )
+ .await;
});
// Return the response so the spawned future can continue.
@@ -104,6 +114,7 @@
db_client: DatabaseClient,
amqp_connection: &AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) -> Result<(), BoxedError> {
let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR")
.unwrap_or_else(|_| format!("0.0.0.0:{}", &CONFIG.http_port));
@@ -125,6 +136,7 @@
db_client: db_client.clone(),
addr,
notif_client: notif_client.clone(),
+ farcaster_client: farcaster_client.clone(),
},
)
.with_upgrades();
@@ -169,6 +181,7 @@
db_client: DatabaseClient,
amqp_connection: AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) {
debug!("Incoming connection from: {}", addr);
@@ -194,6 +207,7 @@
db_client,
amqp_connection,
notif_client,
+ farcaster_client,
)
.await
{
@@ -322,12 +336,20 @@
db_client: DatabaseClient,
amqp: AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) -> Result<WebsocketSession<S>, ErrorWithStreamHandle<S>> {
let device_info = match get_device_info_from_frame(frame).await {
Ok(info) => info,
Err(e) => return Err((e, outgoing)),
};
- WebsocketSession::new(outgoing, db_client, device_info, amqp, notif_client)
- .await
+ WebsocketSession::new(
+ outgoing,
+ db_client,
+ device_info,
+ amqp,
+ notif_client,
+ farcaster_client,
+ )
+ .await
}
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -21,6 +21,7 @@
use crate::amqp_client::AmqpClient;
use crate::database::{self, DatabaseClient};
+use crate::farcaster::FarcasterClient;
use crate::identity;
use crate::notifs::NotifClient;
@@ -41,6 +42,7 @@
// Each websocket has an AMQP connection associated with a particular device
amqp_client: AmqpClient,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
}
#[derive(
@@ -162,6 +164,7 @@
device_info: DeviceInfo,
amqp: AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) -> Result<Self, super::ErrorWithStreamHandle<S>> {
let amqp_client =
match AmqpClient::new(db_client.clone(), device_info.clone(), amqp).await
@@ -176,6 +179,7 @@
device_info,
amqp_client,
notif_client,
+ farcaster_client,
})
}

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 7, 10:28 AM (19 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5843532
Default Alt Text
D15182.1765103283.diff (9 KB)

Event Timeline