diff --git a/lib/shared/farcaster/farcaster-hooks.js b/lib/shared/farcaster/farcaster-hooks.js --- a/lib/shared/farcaster/farcaster-hooks.js +++ b/lib/shared/farcaster/farcaster-hooks.js @@ -23,7 +23,11 @@ type FarcasterMessage, farcasterMessageValidator, } from './farcaster-messages-types.js'; -import { logTypes, useDebugLogs } from '../../components/debug-logs-context.js'; +import { + type AddLogCallback, + logTypes, + useDebugLogs, +} from '../../components/debug-logs-context.js'; import { useIsUserDataReady } from '../../hooks/backup-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useGetCommFCUsersForFIDs } from '../../hooks/user-identities-hooks.js'; @@ -161,6 +165,8 @@ operation: () => Promise, maxRetries: number = MAX_RETRIES, delayMs: number = RETRY_DELAY_MS, + addLog?: AddLogCallback, + operationName?: string, ): Promise { let lastError; @@ -170,6 +176,18 @@ } catch (error) { lastError = error; + if (addLog && operationName) { + addLog( + `Farcaster: Retry attempt ${attempt}/${maxRetries + 1} failed for ${operationName}`, + JSON.stringify({ + attempt, + maxRetries, + error: getMessageForException(error), + }), + new Set([logTypes.FARCASTER]), + ); + } + if (attempt <= maxRetries) { const delay = delayMs * attempt; await sleep(delay); @@ -185,8 +203,11 @@ processor: (item: T, batchedUpdates: BatchedUpdates) => Promise, dispatch: Dispatch, onProgress?: (completed: number, total: number) => void, + addLog?: AddLogCallback, ): Promise> { const results: Array = []; + let failedItemsCount = 0; + for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize); @@ -197,6 +218,17 @@ return { result, updates: itemBatchedUpdates }; } catch (error) { console.log('Error processing item:', item, 'Error:', error); + if (addLog) { + addLog( + 'Farcaster: Failed to process item in batch', + JSON.stringify({ + item: typeof item === 'string' ? item : 'complex_item', + batchIndex: i, + error: getMessageForException(error), + }), + new Set([logTypes.FARCASTER]), + ); + } return null; } }); @@ -204,6 +236,7 @@ const batchResults = await Promise.all(batchPromises); let batchedUpdates = new BatchedUpdates(); + let nullResultsInBatch = 0; for (const itemResult of batchResults) { if (itemResult) { @@ -218,9 +251,24 @@ }); batchedUpdates = new BatchedUpdates(); } + } else { + nullResultsInBatch++; + failedItemsCount++; } } + if (nullResultsInBatch > 0 && addLog) { + addLog( + 'Farcaster: Some items in batch returned null results', + JSON.stringify({ + nullCount: nullResultsInBatch, + batchSize: batch.length, + batchIndex: i, + }), + new Set([logTypes.FARCASTER]), + ); + } + if (!batchedUpdates.isEmpty()) { dispatch({ type: processFarcasterOpsActionType, @@ -235,6 +283,21 @@ await sleep(0); } + if (failedItemsCount > 0 && addLog) { + addLog( + 'Farcaster: Total failed items in processing', + JSON.stringify({ + failedCount: failedItemsCount, + totalCount: items.length, + successRate: + (((items.length - failedItemsCount) / items.length) * 100).toFixed( + 2, + ) + '%', + }), + new Set([logTypes.FARCASTER]), + ); + } + return results; } @@ -249,6 +312,7 @@ conversationId: string, }) => Promise, fetchUsersByFIDs: GetCommFCUsersForFIDs, + addLog?: AddLogCallback, ): Promise { const conversationResult = await withRetry( () => @@ -257,16 +321,63 @@ }), MAX_RETRIES, RETRY_DELAY_MS, + addLog, + `fetchConversation(${conversationID})`, ); if (!conversationResult) { + if (addLog) { + addLog( + 'Farcaster: Conversation result is null after retries', + JSON.stringify({ conversationID }), + new Set([logTypes.FARCASTER]), + ); + } + return null; + } + + if (!conversationResult.result?.conversation) { + if (addLog) { + addLog( + 'Farcaster: Invalid conversation result structure', + JSON.stringify({ + conversationID, + hasResult: !!conversationResult.result, + hasConversation: !!conversationResult.result?.conversation, + }), + new Set([logTypes.FARCASTER]), + ); + } return null; } const farcasterConversation = conversationResult.result.conversation; let thread = createFarcasterRawThreadInfo(farcasterConversation); const fids = thread.members.map(member => member.id); + + if (fids.length === 0 && addLog) { + addLog( + 'Farcaster: No members (FIDs) found in conversation', + JSON.stringify({ conversationID }), + new Set([logTypes.FARCASTER]), + ); + } + const commFCUsersForFIDs = await fetchUsersByFIDs(fids); + + if (commFCUsersForFIDs.size !== fids.length && addLog) { + addLog( + 'Farcaster: FID to user mapping incomplete', + JSON.stringify({ + conversationID, + requestedFIDs: fids.length, + resolvedUsers: commFCUsersForFIDs.size, + missingFIDs: fids.filter(fid => !commFCUsersForFIDs.has(fid)), + }), + new Set([logTypes.FARCASTER]), + ); + } + const threadMembers = thread.members.map( member => ({ @@ -302,9 +413,15 @@ conversationID, fetchFarcasterConversation, fetchUsersByFIDs, + addLog, ); if (!result) { + addLog( + 'Farcaster: No result while fetching conversation', + JSON.stringify({ conversationID }), + new Set([logTypes.FARCASTER]), + ); return null; } @@ -320,7 +437,13 @@ rawEntryInfos: [], }; - if (threadMembers.length > 0) { + if (threadMembers.length === 0) { + addLog( + 'Farcaster: No thread members found for conversation', + JSON.stringify({ conversationID }), + new Set([logTypes.FARCASTER]), + ); + } else { threadMembers.forEach(member => batchedUpdates.addUserID(member.id)); } batchedUpdates.addUpdateInfo(update); @@ -363,15 +486,27 @@ conversationID, fetchFarcasterConversation, fetchUsersByFIDs, + addLog, ); if (!result) { + addLog( + 'Farcaster: No result while fetching conversation with messages', + JSON.stringify({ conversationID, messagesLimit }), + new Set([logTypes.FARCASTER]), + ); return null; } const { farcasterConversation, thread, threadMembers } = result; - if (threadMembers.length > 0) { + if (threadMembers.length === 0) { + addLog( + 'Farcaster: No thread members in conversation with messages', + JSON.stringify({ conversationID }), + new Set([logTypes.FARCASTER]), + ); + } else { threadMembers.forEach(member => batchedUpdates.addUserID(member.id)); } @@ -379,6 +514,15 @@ conversationID, messagesLimit, ); + + if (messagesResult.messages.length === 0) { + addLog( + 'Farcaster: No messages fetched for conversation', + JSON.stringify({ conversationID, messagesLimit }), + new Set([logTypes.FARCASTER]), + ); + } + batchedUpdates.addUserIDs(messagesResult.userIDs); const reduxMessages = messagesResult.messages.slice( @@ -453,11 +597,14 @@ }> => { const result: Array = []; const userIDs: Array = []; + let batchNumber = 0; + try { let totalMessagesFetched = 0; let lastSeenMessageID: ?string = null; do { + batchNumber++; const batchLimit = Math.min( 50, messagesNumberLimit - totalMessagesFetched, @@ -477,16 +624,40 @@ () => fetchFarcasterMessages(messagesInput), MAX_RETRIES, RETRY_DELAY_MS, + addLog, + `fetchMessages(${conversationID}, batch=${batchNumber})`, ); if (messagesResult) { const farcasterMessages = messagesResult.result.messages; + + if (farcasterMessages.length === 0) { + addLog( + 'Farcaster: Empty message batch received', + JSON.stringify({ + conversationID, + batchNumber, + hasCursor: !!cursor, + }), + new Set([logTypes.FARCASTER]), + ); + } + const lastMessageID = farcasterMessages.length > 0 ? farcasterMessages[farcasterMessages.length - 1].messageId : null; if (lastMessageID === lastSeenMessageID) { + addLog( + 'Farcaster: Duplicate message batch detected (breaking loop)', + JSON.stringify({ + conversationID, + lastMessageID, + batchNumber, + }), + new Set([logTypes.FARCASTER]), + ); break; } lastSeenMessageID = lastMessageID; @@ -499,10 +670,36 @@ ); const fcUserInfos = await fetchUsersByFIDs(userFIDs); + if (fcUserInfos.size !== userFIDs.length) { + addLog( + 'Farcaster: Not all FIDs resolved to users in message batch', + JSON.stringify({ + conversationID, + requestedFIDs: userFIDs.length, + resolvedUsers: fcUserInfos.size, + batchNumber, + }), + new Set([logTypes.FARCASTER]), + ); + } + const rawMessageInfos = farcasterMessages.flatMap(message => convertFarcasterMessageToCommMessages(message, fcUserInfos), ); + if (rawMessageInfos.length < farcasterMessages.length) { + addLog( + 'Farcaster: Some messages failed to convert', + JSON.stringify({ + conversationID, + farcasterMessages: farcasterMessages.length, + convertedMessages: rawMessageInfos.length, + batchNumber, + }), + new Set([logTypes.FARCASTER]), + ); + } + userIDs.push( ...Array.from(fcUserInfos.entries()).map( ([fid, user]) => user?.userID ?? userIDFromFID(fid), @@ -525,6 +722,7 @@ conversationID, messagesNumberLimit, cursor, + batchNumber, error: getMessageForException(e), }), new Set([logTypes.FARCASTER]), @@ -580,8 +778,10 @@ ): Promise<$ReadOnlyArray> => { const allConversations: Array = []; let currentCursor = null; + let pageNumber = 0; while (true) { + pageNumber++; try { let input = { limit: 50, category }; if (currentCursor) { @@ -594,8 +794,22 @@ () => fetchFarcasterInbox(input), MAX_RETRIES, RETRY_DELAY_MS, + addLog, + `fetchInbox(${category || 'main'}, page=${pageNumber})`, ); + if (result.conversations.length === 0) { + addLog( + 'Farcaster: Empty inbox page received', + JSON.stringify({ + category: category || 'main', + pageNumber, + hasNextCursor: !!next?.cursor, + }), + new Set([logTypes.FARCASTER]), + ); + } + allConversations.push(...result.conversations); if (next?.cursor) { @@ -609,6 +823,8 @@ JSON.stringify({ category: category || 'main', cursor: currentCursor, + pageNumber, + conversationsFetchedSoFar: allConversations.length, error: getMessageForException(e), }), new Set([logTypes.FARCASTER]), @@ -641,6 +857,7 @@ ) => void { const dispatch = useDispatch(); const threadInfos = useSelector(state => state.threadStore.threadInfos); + const { addLog } = useDebugLogs(); return React.useCallback( (conversations: $ReadOnlyArray) => { @@ -662,6 +879,14 @@ threadID: threadInfo.id, })); + addLog( + 'Farcaster: Removing dead threads', + JSON.stringify({ + removedThreads: updateInfos.map(updateInfo => updateInfo.threadID), + }), + new Set([logTypes.FARCASTER]), + ); + dispatch({ type: processFarcasterOpsActionType, payload: { @@ -670,27 +895,22 @@ }, }); }, - [dispatch, threadInfos], + [addLog, dispatch, threadInfos], ); } function useFarcasterConversationsSync(): ( - limit: number, onProgress?: (completed: number, total: number) => void, ) => Promise { const dispatch = useDispatch(); const fetchConversationWithMessages = useFetchConversationWithMessages(); const setFarcasterDCsLoaded = useSetFarcasterDCsLoaded(); const { addLog } = useDebugLogs(); - const threadInfos = useSelector(state => state.threadStore.threadInfos); const fetchInboxes = useFetchInboxIDs(); const removeDeadThreads = useRemoveDeadThreads(); return React.useCallback( - async ( - limit: number, - onProgress?: (completed: number, total: number) => void, - ) => { + async (onProgress?: (completed: number, total: number) => void) => { try { const inboxResults = await Promise.all([ fetchInboxes(), @@ -702,20 +922,15 @@ removeDeadThreads(conversations); if (conversations.length === 0) { + addLog( + 'Farcaster: No conversations to sync', + JSON.stringify({}), + new Set([logTypes.FARCASTER]), + ); setFarcasterDCsLoaded(true); return; } - const threadIDs = new Set(Object.keys(threadInfos)); - const newConversations = new Set( - conversations.filter( - conversationID => - !threadIDs.has( - farcasterThreadIDFromConversationID(conversationID), - ), - ), - ); - onProgress?.(0, conversations.length); await processInBatchesWithReduxBatching( conversations, @@ -723,13 +938,12 @@ (conversationID, batchedUpdates) => fetchConversationWithMessages( conversationID, - newConversations.has(conversationID) - ? Number.POSITIVE_INFINITY - : limit, + Number.POSITIVE_INFINITY, batchedUpdates, ), dispatch, (completed, total) => onProgress?.(completed, total), + addLog, ); setFarcasterDCsLoaded(true); @@ -751,7 +965,6 @@ fetchInboxes, removeDeadThreads, setFarcasterDCsLoaded, - threadInfos, ], ); } @@ -845,6 +1058,7 @@ existingConversationIDs.length + completed, conversations.length, ), + addLog, ); } } catch (e) { @@ -880,13 +1094,16 @@ const threadInfos = useSelector(state => state.threadStore.threadInfos); const fetchConversationWithMessages = useFetchConversationWithMessages(); const currentlyFetchedConversations = React.useRef>(new Set()); + const { addLog } = useDebugLogs(); return React.useCallback( async (farcasterMessage: FarcasterMessage) => { + const threadID = farcasterThreadIDFromConversationID( + farcasterMessage.conversationId, + ); + if ( - !threadInfos[ - farcasterThreadIDFromConversationID(farcasterMessage.conversationId) - ] && + !threadInfos[threadID] && !currentlyFetchedConversations.current.has( farcasterMessage.conversationId, ) @@ -919,6 +1136,18 @@ farcasterMessage, fcUserInfos, ); + + if (rawMessageInfos.length === 0) { + addLog( + 'Farcaster: Failed to convert new message to Comm messages', + JSON.stringify({ + conversationID: farcasterMessage.conversationId, + messageID: farcasterMessage.messageId, + }), + new Set([logTypes.FARCASTER]), + ); + } + const userIDs = userFIDs.map(fid => userIDFromFID(`${fid}`)); const updates: Array = []; @@ -942,6 +1171,7 @@ }); }, [ + addLog, dispatch, fetchConversationWithMessages, fetchMessage, @@ -1018,10 +1248,7 @@ setProgress(null); void (async () => { try { - await syncFarcasterConversations( - Number.POSITIVE_INFINITY, - handleProgress, - ); + await syncFarcasterConversations(handleProgress); } finally { setInProgress(false); setProgress(null);