diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -10,6 +10,7 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; +use tunnelbroker_messages::farcaster::{FarcasterMessage, FarcasterPayload}; pub(crate) struct TokenConnection { db: DatabaseClient, @@ -123,8 +124,8 @@ Ok(false) => { warn!( "Lost token ownership for user {}, stopping reconnection attempts", - self.user_id) - ; + self.user_id + ); return Err(TokenConnectionError::TokenOwnershipLost); } Err(err) => { @@ -252,7 +253,24 @@ Some(Ok(msg)) => match msg { Message::Text(text) => { info!("Received message for {}: {}", self.user_id, text); - //TODO: Handle incoming message + match serde_json::from_str::(&text) { + Ok(farcaster_msg) => { + match &farcaster_msg.payload { + FarcasterPayload::RefreshDirectCastConversation { payload, .. } => { + debug!("Processing refresh-direct-cast-conversation message"); + } + FarcasterPayload::RefreshSelfDirectCastsInbox { payload, .. } => { + debug!("Processing refresh-self-direct-casts-inbox message"); + } + FarcasterPayload::Unseen { .. } => { + debug!("Processing unseen message"); + } + } + } + Err(e) => { + warn!("Failed to parse message as Farcaster format: {}", e); + } + } } Message::Binary(_data) => { debug!("Received binary message for user: {}", self.user_id); diff --git a/shared/tunnelbroker_messages/src/messages/farcaster.rs b/shared/tunnelbroker_messages/src/messages/farcaster.rs --- a/shared/tunnelbroker_messages/src/messages/farcaster.rs +++ b/shared/tunnelbroker_messages/src/messages/farcaster.rs @@ -60,3 +60,267 @@ pub request_id: String, pub response: FarcasterAPIResponseData, } + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct SenderContext { + pub display_name: String, + pub fid: u64, + pub pfp: ProfilePicture, + pub username: String, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +pub struct ProfilePicture { + pub url: String, + pub verified: bool, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DirectCastMessage { + pub conversation_id: String, + pub message: String, + pub message_id: String, + pub sender_fid: u64, + pub server_timestamp: u64, + #[serde(rename = "type")] + pub message_type: String, + pub is_deleted: bool, + pub sender_context: SenderContext, + pub reactions: Vec, + pub has_mention: bool, + pub is_pinned: bool, + pub mentions: Vec, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(tag = "messageType")] +pub enum FarcasterPayload { + #[serde(rename = "refresh-direct-cast-conversation")] + RefreshDirectCastConversation { + payload: RefreshDirectCastConversationPayload, + #[serde(flatten)] + extra: serde_json::Map, + }, + #[serde(rename = "refresh-self-direct-casts-inbox")] + RefreshSelfDirectCastsInbox { + payload: RefreshSelfDirectCastsInboxPayload, + #[serde(flatten)] + extra: serde_json::Map, + }, + #[serde(rename = "unseen")] + Unseen { + #[serde(flatten)] + extra: serde_json::Map, + }, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct RefreshDirectCastConversationPayload { + pub conversation_id: String, + pub message: DirectCastMessage, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct RefreshSelfDirectCastsInboxPayload { + pub conversation_id: String, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FarcasterMessage { + #[serde(flatten)] + pub payload: FarcasterPayload, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive( + Serialize, Deserialize, TagAwareDeserialize, PartialEq, Debug, Clone, +)] +#[serde(tag = "type", remote = "Self", rename_all = "camelCase")] +pub struct NewFarcasterMessage { + pub message: DirectCastMessage, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_refresh_direct_cast_conversation_parsing() { + let sample_json = r#"{ + "messageType":"refresh-direct-cast-conversation", + "payload":{ + "conversationId":"efa192faf954b2f8", + "message":{ + "conversationId":"efa192faf954b2f8", + "message":"Test", + "messageId":"98548a664966761e4903fd6c20476500", + "senderFid":946308, + "serverTimestamp":1756219250040, + "type":"text", + "isDeleted":false, + "senderContext":{ + "displayName":"Kamil", + "fid":946308, + "pfp":{ + "url":"https://imagedelivery.net/BXluQx4ige9GuW0Ia56BHw/\ + 5ea8b176-f90d-4bcc-480b-5c903b0c7700/rectcrop3", + "verified":false + }, + "username":"kamilswm" + }, + "reactions":[], + "hasMention":false, + "isPinned":false, + "mentions":[] + } + }, + "data":"efa192faf954b2f8" + }"#; + + let parsed_message: FarcasterMessage = serde_json::from_str(sample_json) + .expect("Should parse the sample message correctly"); + + // Verify the data field + assert_eq!(parsed_message.data, Some("efa192faf954b2f8".to_string())); + + // Verify the payload is parsed correctly + match parsed_message.payload { + FarcasterPayload::RefreshDirectCastConversation { + payload: refresh_payload, + .. + } => { + assert_eq!(refresh_payload.conversation_id, "efa192faf954b2f8"); + assert_eq!(refresh_payload.message.message, "Test"); + assert_eq!(refresh_payload.message.sender_fid, 946308); + assert_eq!( + refresh_payload.message.sender_context.display_name, + "Kamil" + ); + assert_eq!(refresh_payload.message.sender_context.username, "kamilswm"); + } + _ => panic!("Expected RefreshDirectCastConversation payload"), + } + } + + #[test] + fn test_refresh_direct_cast_conversation_serialization() { + // Create a test message + let test_message = FarcasterMessage { + payload: FarcasterPayload::RefreshDirectCastConversation { + payload: RefreshDirectCastConversationPayload { + conversation_id: "test123".to_string(), + message: DirectCastMessage { + conversation_id: "test123".to_string(), + message: "Hello".to_string(), + message_id: "msg456".to_string(), + sender_fid: 123, + server_timestamp: 1000000, + message_type: "text".to_string(), + is_deleted: false, + sender_context: SenderContext { + display_name: "Test User".to_string(), + fid: 123, + pfp: ProfilePicture { + url: "https://example.com/pic.jpg".to_string(), + verified: true, + extra: serde_json::Map::new(), + }, + username: "testuser".to_string(), + extra: serde_json::Map::new(), + }, + reactions: vec![], + has_mention: false, + is_pinned: false, + mentions: vec![], + extra: serde_json::Map::new(), + }, + extra: serde_json::Map::new(), + }, + extra: serde_json::Map::new(), + }, + data: Some("test123".to_string()), + extra: serde_json::Map::new(), + }; + + // Serialize and verify it works + let serialized = + serde_json::to_string(&test_message).expect("Should serialize correctly"); + + // Verify the serialized JSON contains the expected fields + assert!(serialized + .contains("\"messageType\":\"refresh-direct-cast-conversation\"")); + assert!(serialized.contains("\"data\":\"test123\"")); + assert!(serialized.contains("\"conversationId\":\"test123\"")); + + // Deserialize back and verify the payload type is correct + let deserialized: FarcasterMessage = + serde_json::from_str(&serialized).expect("Should deserialize correctly"); + + match deserialized.payload { + FarcasterPayload::RefreshDirectCastConversation { payload, .. } => { + assert_eq!(payload.conversation_id, "test123"); + assert_eq!(payload.message.message, "Hello"); + } + _ => panic!("Expected RefreshDirectCastConversation payload"), + } + } + + #[test] + fn test_unseen_parsing() { + let unseen_json = r#"{"messageType":"unseen","data":"{\"inboxCount\":1}"}"#; + + let parsed_message: FarcasterMessage = serde_json::from_str(unseen_json) + .expect("Should parse the unseen message correctly"); + + // Verify the data field + assert_eq!(parsed_message.data, Some("{\"inboxCount\":1}".to_string())); + + // Verify the payload is parsed correctly + match parsed_message.payload { + FarcasterPayload::Unseen { .. } => { + // For unseen messages, the data is in the top-level data field, + // not in the payload + } + _ => panic!("Expected Unseen payload"), + } + } + + #[test] + fn test_refresh_self_direct_casts_inbox_parsing() { + let inbox_json = r#"{ + "messageType":"refresh-self-direct-casts-inbox", + "payload":{"conversationId":"efa192faf954b2f8"} + }"#; + + let parsed_message: FarcasterMessage = serde_json::from_str(inbox_json) + .expect( + "Should parse the refresh-self-direct-casts-inbox message correctly", + ); + + // Verify the payload is parsed correctly + match parsed_message.payload { + FarcasterPayload::RefreshSelfDirectCastsInbox { payload, .. } => { + assert_eq!(payload.conversation_id, "efa192faf954b2f8"); + } + _ => panic!("Expected RefreshSelfDirectCastsInbox payload"), + } + } +}