Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32197394
D15182.1765103283.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
9 KB
Referenced Files
None
Subscribers
None
D15182.1765103283.diff
View Options
diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs
--- a/services/tunnelbroker/src/config.rs
+++ b/services/tunnelbroker/src/config.rs
@@ -50,6 +50,10 @@
#[arg(env = ENV_WNS_CONFIG)]
#[arg(long)]
pub wns_config: Option<WNSConfig>,
+ /// Farcaster API
+ #[arg(env = "FARCASTER_API_URL")]
+ #[arg(long, default_value = "https://client.farcaster.xyz/")]
+ pub farcaster_api_url: reqwest::Url,
}
/// Stores configuration parsed from command-line arguments
diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -24,6 +24,8 @@
pub const PUSH_SERVICE_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);
+pub const FARCASTER_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);
+
pub mod dynamodb {
// This table holds messages which could not be immediately delivered to
// a device.
diff --git a/services/tunnelbroker/src/farcaster/error.rs b/services/tunnelbroker/src/farcaster/error.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/farcaster/error.rs
@@ -0,0 +1,10 @@
+use derive_more::{Display, Error, From};
+
+#[derive(Debug, From, Display, Error)]
+pub enum Error {
+ ReqwestError(reqwest::Error),
+ InvalidHeaderValue(reqwest::header::InvalidHeaderValue),
+ MissingFarcasterToken,
+ InvalidRequest,
+ DatabaseError(comm_lib::database::Error),
+}
diff --git a/services/tunnelbroker/src/farcaster/mod.rs b/services/tunnelbroker/src/farcaster/mod.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/farcaster/mod.rs
@@ -0,0 +1,107 @@
+use crate::constants::FARCASTER_REQUEST_TIMEOUT;
+use crate::database::DatabaseClient;
+use crate::farcaster::error::Error::MissingFarcasterToken;
+use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
+use tracing::{debug, error};
+pub mod error;
+
+#[derive(Debug)]
+pub enum APIMethod {
+ PUT,
+ GET,
+ STREAM,
+}
+
+pub struct FarcasterAPIRequest {
+ pub request_id: String,
+ pub user_id: String,
+ /// API version, examples: "v2", "fc"
+ pub api_version: String,
+ pub endpoint: String,
+ pub method: APIMethod,
+ /// query, body, or stream message
+ pub payload: String,
+}
+
+#[derive(Clone)]
+pub struct FarcasterClient {
+ farcaster_api_url: reqwest::Url,
+ http_client: reqwest::Client,
+ db_client: DatabaseClient,
+}
+
+impl FarcasterClient {
+ pub fn new(
+ farcaster_api_url: reqwest::Url,
+ db_client: DatabaseClient,
+ ) -> Result<Self, error::Error> {
+ let http_client = reqwest::Client::builder()
+ .timeout(FARCASTER_REQUEST_TIMEOUT)
+ .build()?;
+
+ Ok(FarcasterClient {
+ farcaster_api_url,
+ http_client,
+ db_client,
+ })
+ }
+
+ pub async fn api_request(
+ &self,
+ request: FarcasterAPIRequest,
+ ) -> Result<(reqwest::StatusCode, String), error::Error> {
+ debug!(
+ "Received Farcaster {:?} {} {} request from {}",
+ request.method, request.api_version, request.endpoint, request.user_id
+ );
+
+ let farcaster_dc_token_response = self
+ .db_client
+ .get_farcaster_token(&request.user_id)
+ .await
+ .map_err(error::Error::DatabaseError)?;
+
+ let farcaster_dc_token = match farcaster_dc_token_response {
+ Some(token) => token,
+ None => return Err(MissingFarcasterToken),
+ };
+
+ let mut headers = HeaderMap::new();
+ let bearer = format!("Bearer {}", farcaster_dc_token);
+ let mut bearer_header = HeaderValue::from_str(&bearer)?;
+ bearer_header.set_sensitive(true);
+ headers.insert(AUTHORIZATION, bearer_header);
+ headers.insert(
+ reqwest::header::CONTENT_TYPE,
+ HeaderValue::from_static("application/json"),
+ );
+
+ let mut url = self.farcaster_api_url.clone();
+
+ // Append path segments
+ url
+ .path_segments_mut()
+ .expect("base URL cannot be base")
+ .extend(&[&request.api_version, &request.endpoint]);
+
+ let request_builder = match request.method {
+ APIMethod::PUT => self
+ .http_client
+ .put(url)
+ .headers(headers)
+ .body(request.payload),
+ APIMethod::GET => {
+ // Directly append entire string of params
+ url.set_query(Some(&request.payload));
+ self.http_client.get(url).headers(headers)
+ }
+ APIMethod::STREAM => {
+ error!("Received STREAM API for {}", request.endpoint);
+ return Err(error::Error::InvalidRequest);
+ }
+ };
+
+ let response = request_builder.send().await?;
+ Ok((response.status(), response.text().await?))
+ }
+}
diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs
--- a/services/tunnelbroker/src/main.rs
+++ b/services/tunnelbroker/src/main.rs
@@ -3,11 +3,13 @@
pub mod constants;
pub mod database;
pub mod error;
+pub mod farcaster;
pub mod grpc;
pub mod identity;
pub mod notifs;
pub mod websockets;
+use crate::farcaster::FarcasterClient;
use crate::notifs::NotifClient;
use amqp_client::amqp;
use anyhow::{anyhow, Result};
@@ -51,11 +53,17 @@
let notif_client = NotifClient::new(db_client.clone());
+ let farcaster_api_url = CONFIG.farcaster_api_url.clone();
+ let farcaster_client =
+ FarcasterClient::new(farcaster_api_url, db_client.clone())
+ .expect("Unable to create Farcaster client");
+
let grpc_server = grpc::run_server(db_client.clone(), &amqp_connection);
let websocket_server = websockets::run_server(
db_client.clone(),
&amqp_connection,
notif_client.clone(),
+ farcaster_client.clone(),
);
tokio::select! {
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -1,10 +1,11 @@
pub mod session;
-use crate::amqp_client::amqp::AmqpConnection;
+use crate::amqp::AmqpConnection;
use crate::constants::{SOCKET_HEARTBEAT_TIMEOUT, WS_SESSION_CLOSE_AMQP_MSG};
use crate::database::DatabaseClient;
use crate::notifs::NotifClient;
use crate::websockets::session::SessionError;
+use crate::FarcasterClient;
use crate::CONFIG;
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
@@ -43,6 +44,7 @@
amqp: AmqpConnection,
db_client: DatabaseClient,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
}
impl hyper::service::Service<Request<Body>> for WebsocketService {
@@ -65,6 +67,7 @@
let db_client = self.db_client.clone();
let amqp = self.amqp.clone();
let notif_client = self.notif_client.clone();
+ let farcaster_client = self.farcaster_client.clone();
let future = async move {
// Check if the request is a websocket upgrade request.
@@ -73,8 +76,15 @@
// Spawn a task to handle the websocket connection.
tokio::spawn(async move {
- accept_connection(websocket, addr, db_client, amqp, notif_client)
- .await;
+ accept_connection(
+ websocket,
+ addr,
+ db_client,
+ amqp,
+ notif_client,
+ farcaster_client,
+ )
+ .await;
});
// Return the response so the spawned future can continue.
@@ -104,6 +114,7 @@
db_client: DatabaseClient,
amqp_connection: &AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) -> Result<(), BoxedError> {
let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR")
.unwrap_or_else(|_| format!("0.0.0.0:{}", &CONFIG.http_port));
@@ -125,6 +136,7 @@
db_client: db_client.clone(),
addr,
notif_client: notif_client.clone(),
+ farcaster_client: farcaster_client.clone(),
},
)
.with_upgrades();
@@ -169,6 +181,7 @@
db_client: DatabaseClient,
amqp_connection: AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) {
debug!("Incoming connection from: {}", addr);
@@ -194,6 +207,7 @@
db_client,
amqp_connection,
notif_client,
+ farcaster_client,
)
.await
{
@@ -322,12 +336,20 @@
db_client: DatabaseClient,
amqp: AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) -> Result<WebsocketSession<S>, ErrorWithStreamHandle<S>> {
let device_info = match get_device_info_from_frame(frame).await {
Ok(info) => info,
Err(e) => return Err((e, outgoing)),
};
- WebsocketSession::new(outgoing, db_client, device_info, amqp, notif_client)
- .await
+ WebsocketSession::new(
+ outgoing,
+ db_client,
+ device_info,
+ amqp,
+ notif_client,
+ farcaster_client,
+ )
+ .await
}
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -21,6 +21,7 @@
use crate::amqp_client::AmqpClient;
use crate::database::{self, DatabaseClient};
+use crate::farcaster::FarcasterClient;
use crate::identity;
use crate::notifs::NotifClient;
@@ -41,6 +42,7 @@
// Each websocket has an AMQP connection associated with a particular device
amqp_client: AmqpClient,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
}
#[derive(
@@ -162,6 +164,7 @@
device_info: DeviceInfo,
amqp: AmqpConnection,
notif_client: NotifClient,
+ farcaster_client: FarcasterClient,
) -> Result<Self, super::ErrorWithStreamHandle<S>> {
let amqp_client =
match AmqpClient::new(db_client.clone(), device_info.clone(), amqp).await
@@ -176,6 +179,7 @@
device_info,
amqp_client,
notif_client,
+ farcaster_client,
})
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Dec 7, 10:28 AM (19 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5843532
Default Alt Text
D15182.1765103283.diff (9 KB)
Attached To
Mode
D15182: [tunnelbroker] implement Farcaster client
Attached
Detach File
Event Timeline
Log In to Comment