diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -90,8 +90,12 @@ .await?; let token_config = TokenDistributorConfig::default(); - let mut token_distributor = - TokenDistributor::new(db_client.clone(), token_config, &amqp_connection, grpc_client); + let mut token_distributor = TokenDistributor::new( + db_client.clone(), + token_config, + &amqp_connection, + grpc_client, + ); tokio::select! { grpc_result = grpc_server => { diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -10,6 +10,7 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; +use tunnelbroker_messages::farcaster::FarcasterMessage; pub(crate) struct TokenConnection { db: DatabaseClient, @@ -123,8 +124,8 @@ Ok(false) => { warn!( "Lost token ownership for user {}, stopping reconnection attempts", - self.user_id) - ; + self.user_id + ); return Err(TokenConnectionError::TokenOwnershipLost); } Err(err) => { @@ -252,7 +253,38 @@ Some(Ok(msg)) => match msg { Message::Text(text) => { info!("Received message for {}: {}", self.user_id, text); - //TODO: Handle incoming message + match serde_json::from_str::(&text) { + Ok(farcaster_msg) => { + debug!("Parsed Farcaster message type: {}", farcaster_msg.message_type); + + match farcaster_msg.message_type.as_str() { + "refresh-direct-cast-conversation" => { + debug!("Processing refresh-direct-cast-conversation message"); + if let Some(payload) = &farcaster_msg.payload { + debug!("Conversation payload: {}", payload); + } + } + "refresh-self-direct-casts-inbox" => { + debug!("Processing refresh-self-direct-casts-inbox message"); + if let Some(payload) = &farcaster_msg.payload { + debug!("Inbox refresh payload: {}", payload); + } + } + "unseen" => { + debug!("Processing unseen message"); + if let Some(data) = &farcaster_msg.data { + debug!("Unseen data: {}", data); + } + } + _ => { + info!("Unknown Farcaster message type: {}", farcaster_msg.message_type); + } + } + } + Err(e) => { + warn!("Failed to parse message as Farcaster format: {}", e); + } + } } Message::Binary(_data) => { debug!("Received binary message for user: {}", self.user_id); diff --git a/shared/tunnelbroker_messages/src/messages/farcaster.rs b/shared/tunnelbroker_messages/src/messages/farcaster.rs --- a/shared/tunnelbroker_messages/src/messages/farcaster.rs +++ b/shared/tunnelbroker_messages/src/messages/farcaster.rs @@ -60,3 +60,94 @@ pub request_id: String, pub response: FarcasterAPIResponseData, } + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct SenderContext { + pub display_name: String, + pub fid: u64, + pub pfp: ProfilePicture, + pub username: String, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct ProfilePicture { + pub url: String, + pub verified: bool, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct DirectCastMessage { + pub conversation_id: String, + pub message: String, + pub message_id: String, + pub sender_fid: u64, + pub server_timestamp: u64, + #[serde(rename = "type")] + pub message_type: String, + pub is_deleted: bool, + pub sender_context: SenderContext, + pub reactions: Vec, + pub has_mention: bool, + pub is_pinned: bool, + pub mentions: Vec, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(tag = "messageType")] +pub enum FarcasterPayload { + #[serde(rename = "refresh-direct-cast-conversation")] + RefreshDirectCastConversation { + payload: RefreshDirectCastConversationPayload, + #[serde(flatten)] + extra: serde_json::Map, + }, + #[serde(rename = "refresh-self-direct-casts-inbox")] + RefreshSelfDirectCastsInbox { + payload: RefreshSelfDirectCastsInboxPayload, + #[serde(flatten)] + extra: serde_json::Map, + }, + #[serde(rename = "unseen")] + Unseen { + data: String, + #[serde(flatten)] + extra: serde_json::Map, + }, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RefreshDirectCastConversationPayload { + pub conversation_id: String, + pub message: DirectCastMessage, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RefreshSelfDirectCastsInboxPayload { + pub conversation_id: String, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct FarcasterMessage { + pub message_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + #[serde(flatten)] + pub extra: serde_json::Map, +}