diff --git a/lib/shared/farcaster/farcaster-hooks.js b/lib/shared/farcaster/farcaster-hooks.js --- a/lib/shared/farcaster/farcaster-hooks.js +++ b/lib/shared/farcaster/farcaster-hooks.js @@ -1326,6 +1326,92 @@ ]); } +function useHandleFarcasterInboxStatus(): ( + unreadConversationIds: $ReadOnlyArray, + readConversationIds: $ReadOnlyArray, +) => Promise { + const dispatch = useDispatch(); + const threadInfos = useSelector(state => state.threadStore.threadInfos); + const fetchConversationWithBatching = useFetchConversationWithBatching(); + const { addLog } = useDebugLogs(); + + return React.useCallback( + async ( + unreadConversationIds: $ReadOnlyArray, + readConversationIds: $ReadOnlyArray, + ) => { + try { + const allConversationIds = [ + ...unreadConversationIds, + ...readConversationIds, + ]; + + const unreadStatusMap = new Map(); + unreadConversationIds.forEach(id => unreadStatusMap.set(id, true)); + readConversationIds.forEach(id => unreadStatusMap.set(id, false)); + + const updateInfos: Array = []; + const unknownConversationIds: Array = []; + + for (const conversationID of allConversationIds) { + const threadID = farcasterThreadIDFromConversationID(conversationID); + const threadInfo = threadInfos[threadID]; + const expectedUnreadStatus = unreadStatusMap.get(conversationID); + + if (!threadInfo) { + unknownConversationIds.push(conversationID); + } else if ( + threadInfo.currentUser.unread !== expectedUnreadStatus && + expectedUnreadStatus !== undefined + ) { + updateInfos.push({ + id: uuid.v4(), + type: updateTypes.UPDATE_THREAD_READ_STATUS, + time: Date.now(), + threadID, + unread: expectedUnreadStatus, + }); + } + } + + if (unknownConversationIds.length > 0) { + await processInBatchesWithReduxBatching( + unknownConversationIds, + FARCASTER_DATA_BATCH_SIZE, + (conversationID, batchedUpdates) => + fetchConversationWithBatching(conversationID, batchedUpdates), + dispatch, + undefined, + addLog, + ); + } + + if (updateInfos.length > 0) { + dispatch({ + type: processFarcasterOpsActionType, + payload: { + rawMessageInfos: [], + updateInfos, + }, + }); + } + } catch (e) { + addLog( + 'Farcaster: Failed to handle inbox status', + JSON.stringify({ + error: getMessageForException(e), + unreadCount: unreadConversationIds.length, + readCount: readConversationIds.length, + }), + new Set([logTypes.FARCASTER]), + ); + throw e; + } + }, + [addLog, dispatch, fetchConversationWithBatching, threadInfos], + ); +} + export { useFarcasterConversationsSync, useLightweightFarcasterConversationsSync, @@ -1338,4 +1424,5 @@ useFarcasterSync, useFarcasterThreadRefresher, useLightweightSyncOnAppStart, + useHandleFarcasterInboxStatus, }; diff --git a/lib/tunnelbroker/use-peer-to-peer-message-handler.js b/lib/tunnelbroker/use-peer-to-peer-message-handler.js --- a/lib/tunnelbroker/use-peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/use-peer-to-peer-message-handler.js @@ -34,7 +34,10 @@ } from '../shared/device-list-utils.js'; import { dmOperationSpecificationTypes } from '../shared/dm-ops/dm-op-types.js'; import { useProcessDMOperation } from '../shared/dm-ops/process-dm-ops.js'; -import { useAddNewFarcasterMessage } from '../shared/farcaster/farcaster-hooks.js'; +import { + useAddNewFarcasterMessage, + useHandleFarcasterInboxStatus, +} from '../shared/farcaster/farcaster-hooks.js'; import { IdentityClientContext } from '../shared/identity-client-context.js'; import type { IdentityServiceClient, @@ -318,6 +321,7 @@ const { createOlmSessionsWithUser } = usePeerOlmSessionsCreatorContext(); const addNewFarcasterMessage = useAddNewFarcasterMessage(); + const handleFarcasterInboxStatus = useHandleFarcasterInboxStatus(); const { addLog } = useDebugLogs(); const olmDebugLog = useOlmDebugLogs(); @@ -669,6 +673,21 @@ }`, ); } + } else if ( + message.type === peerToPeerMessageTypes.FARCASTER_INBOX_STATUS + ) { + try { + await handleFarcasterInboxStatus( + message.unreadConversationIds, + message.readConversationIds, + ); + } catch (e) { + console.log( + `Error processing Farcaster inbox status: ${ + getMessageForException(e) ?? 'unknown error' + }`, + ); + } } }, [ @@ -687,6 +706,7 @@ handlePrimaryDeviceChanges, sqliteAPI, addNewFarcasterMessage, + handleFarcasterInboxStatus, ], ); } diff --git a/lib/types/tunnelbroker/farcaster-messages-types.js b/lib/types/tunnelbroker/farcaster-messages-types.js --- a/lib/types/tunnelbroker/farcaster-messages-types.js +++ b/lib/types/tunnelbroker/farcaster-messages-types.js @@ -116,3 +116,16 @@ type: tString('NewFarcasterMessage'), message: farcasterMessageValidator, }); + +export type FarcasterInboxStatus = { + +type: 'FarcasterInboxStatus', + +unreadConversationIds: $ReadOnlyArray, + +readConversationIds: $ReadOnlyArray, +}; + +export const farcasterInboxStatusValidator: TInterface = + tShape({ + type: tString('FarcasterInboxStatus'), + unreadConversationIds: t.list(t.String), + readConversationIds: t.list(t.String), + }); diff --git a/lib/types/tunnelbroker/peer-to-peer-message-types.js b/lib/types/tunnelbroker/peer-to-peer-message-types.js --- a/lib/types/tunnelbroker/peer-to-peer-message-types.js +++ b/lib/types/tunnelbroker/peer-to-peer-message-types.js @@ -3,8 +3,14 @@ import type { TInterface, TUnion } from 'tcomb'; import t from 'tcomb'; -import type { NewFarcasterMessage } from './farcaster-messages-types.js'; -import { newFarcasterMessageValidator } from './farcaster-messages-types.js'; +import type { + NewFarcasterMessage, + FarcasterInboxStatus, +} from './farcaster-messages-types.js'; +import { + newFarcasterMessageValidator, + farcasterInboxStatusValidator, +} from './farcaster-messages-types.js'; import { tShape, tString, tUserID } from '../../utils/validation-utils.js'; import { type EncryptedData, encryptedDataValidator } from '../crypto-types.js'; import { @@ -31,6 +37,7 @@ IDENTITY_DEVICE_LIST_UPDATED: 'IdentityDeviceListUpdated', BAD_DEVICE_TOKEN: 'BadDeviceToken', NEW_FARCASTER_MESSAGE: 'NewFarcasterMessage', + FARCASTER_INBOX_STATUS: 'FarcasterInboxStatus', }); export type OutboundSessionCreation = { @@ -132,7 +139,8 @@ | MessageProcessed | IdentityDeviceListUpdated | BadDeviceToken - | NewFarcasterMessage; + | NewFarcasterMessage + | FarcasterInboxStatus; export const peerToPeerMessageValidator: TUnion = t.union([ outboundSessionCreationValidator, @@ -144,4 +152,5 @@ identityDeviceListUpdatedValidator, badDeviceTokenValidator, newFarcasterMessageValidator, + farcasterInboxStatusValidator, ]); diff --git a/services/tunnelbroker/src/farcaster/mod.rs b/services/tunnelbroker/src/farcaster/mod.rs --- a/services/tunnelbroker/src/farcaster/mod.rs +++ b/services/tunnelbroker/src/farcaster/mod.rs @@ -10,6 +10,7 @@ use tracing::{debug, info, warn}; use tunnelbroker_messages::farcaster::{ APIMethod, DirectCastConversation, FarcasterAPIRequest, + FarcasterInboxConversation, }; pub mod error; @@ -80,6 +81,65 @@ Ok(converstion) } + pub async fn fetch_inbox( + &self, + calling_user_id: &str, + ) -> Result, error::Error> { + debug!("Fetching FC inbox for user={}", calling_user_id); + + let mut all_conversations = Vec::new(); + let mut cursor: Option = None; + + loop { + let payload = if let Some(ref cursor_value) = cursor { + format!("limit=50&cursor={}", cursor_value) + } else { + "limit=50".to_string() + }; + + let (status, response_text) = self + .api_request(FarcasterAPIRequest { + request_id: uuid::Uuid::new_v4().to_string(), + user_id: calling_user_id.to_string(), + api_version: "v2".to_string(), + endpoint: "direct-cast-inbox".to_string(), + method: APIMethod::GET, + payload, + }) + .await?; + + if !status.is_success() { + return Err(status.into()); + } + + let response_value: serde_json::Value = + serde_json::from_str(&response_text)?; + + let conversations: Vec = + serde_json::from_value( + response_value["result"]["conversations"].clone(), + )?; + + all_conversations.extend(conversations); + + cursor = response_value["next"]["cursor"] + .as_str() + .map(|s| s.to_string()); + + if cursor.is_none() { + break; + } + } + + debug!( + "Fetched {} conversations from inbox for user={}", + all_conversations.len(), + calling_user_id + ); + + Ok(all_conversations) + } + pub async fn api_request( &self, request: FarcasterAPIRequest, diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -26,8 +26,8 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; use tunnelbroker_messages::farcaster::{ - DirectCastMessage, FarcasterMessage, FarcasterPayload, NewFarcasterMessage, - RefreshDirectCastConversationPayload, + DirectCastMessage, FarcasterInboxStatus, FarcasterMessage, FarcasterPayload, + NewFarcasterMessage, RefreshDirectCastConversationPayload, }; pub(crate) struct TokenConnection { @@ -341,7 +341,17 @@ debug!("Processing refresh-self-direct-casts-inbox message"); } FarcasterPayload::Unseen { .. } => { - debug!("Processing unseen message"); + if let Err(e) = self.handle_unseen_message(&mut client).await { + info!( + metricType = "TokenDistributor_ConnectionFailure", + metricValue = 1, + instanceId = self.config.instance_id, + userId = redact_sensitive_data(user_id), + errorType = "MessageHandlingFailed", + "Failed to handle unseen message: {:?}", + e + ); + } } } } @@ -655,6 +665,64 @@ Ok(result) } + async fn handle_unseen_message( + &self, + client: &mut ChainedInterceptedServicesAuthClient, + ) -> Result<(), TokenConnectionError> { + let conversations = self + .farcaster_client + .fetch_inbox(&self.token_info.user_id) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to fetch inbox: {e:?}", + )) + })?; + + let mut unread_conversation_ids = Vec::new(); + let mut read_conversation_ids = Vec::new(); + + for conversation in conversations { + if conversation.viewer_context.unread_count > 0 + || conversation.viewer_context.manually_marked_unread + { + unread_conversation_ids.push(conversation.conversation_id); + } else { + read_conversation_ids.push(conversation.conversation_id); + } + } + + let inbox_status = FarcasterInboxStatus { + unread_conversation_ids, + read_conversation_ids, + }; + + let message_payload = + serde_json::to_string(&inbox_status).map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to serialize inbox status: {}", + e + )) + })?; + + let recipient_devices = self.get_self_user_device_list(client).await?; + + for (device_id, _) in &recipient_devices { + self + .message_sender + .simple_send_message_to_device(device_id, message_payload.clone()) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to send inbox status message: {}", + e + )) + })?; + } + + Ok(()) + } + async fn mirror_media_to_blob( &self, medias: Vec, diff --git a/shared/tunnelbroker_messages/src/messages/farcaster.rs b/shared/tunnelbroker_messages/src/messages/farcaster.rs --- a/shared/tunnelbroker_messages/src/messages/farcaster.rs +++ b/shared/tunnelbroker_messages/src/messages/farcaster.rs @@ -141,6 +141,29 @@ pub extra: serde_json::Map, } +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FarcasterInboxViewerContext { + pub category: String, + pub last_read_at: u64, + pub muted: bool, + pub manually_marked_unread: bool, + pub pinned: bool, + pub unread_count: u64, + pub unread_mentions_count: u64, + #[serde(flatten)] + pub extra: serde_json::Map, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FarcasterInboxConversation { + pub conversation_id: String, + pub viewer_context: FarcasterInboxViewerContext, + #[serde(flatten)] + pub extra: serde_json::Map, +} + impl DirectCastConversation { pub fn participant(&self, fid: u64) -> Option<&DirectCastUser> { self.participants.iter().find(|u| u.fid == fid) @@ -205,6 +228,15 @@ pub message: DirectCastMessage, } +#[derive( + Serialize, Deserialize, TagAwareDeserialize, PartialEq, Debug, Clone, +)] +#[serde(tag = "type", remote = "Self", rename_all = "camelCase")] +pub struct FarcasterInboxStatus { + pub unread_conversation_ids: Vec, + pub read_conversation_ids: Vec, +} + #[cfg(test)] mod tests { use super::*;