diff --git a/lib/reducers/user-reducer.js b/lib/reducers/user-reducer.js --- a/lib/reducers/user-reducer.js +++ b/lib/reducers/user-reducer.js @@ -30,6 +30,7 @@ type UserStoreOperation, userStoreOpsHandlers, } from '../ops/user-store-ops.js'; +import { processFarcasterOpsActionType } from '../shared/farcaster/farcaster-actions.js'; import { stateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { updateSpecs } from '../shared/updates/update-specs.js'; import type { BaseAction } from '../types/redux-types.js'; @@ -284,10 +285,15 @@ state: UserStore, action: BaseAction, ): ReduceUserInfosResult { - if (action.type === processNewUserIDsActionType) { - const filteredUserIDs = action.payload.userIDs.filter( - id => !state.userInfos[id], - ); + if ( + action.type === processNewUserIDsActionType || + action.type === processFarcasterOpsActionType + ) { + const userIDs = + action.type === processNewUserIDsActionType + ? action.payload.userIDs + : action.payload.userIDs || []; + const filteredUserIDs = userIDs.filter(id => !state.userInfos[id]); if (filteredUserIDs.length === 0) { return [state, [], []]; } diff --git a/lib/shared/farcaster/farcaster-actions.js b/lib/shared/farcaster/farcaster-actions.js --- a/lib/shared/farcaster/farcaster-actions.js +++ b/lib/shared/farcaster/farcaster-actions.js @@ -9,4 +9,5 @@ +rawMessageInfos: $ReadOnlyArray, +additionalMessageInfos?: $ReadOnlyArray, +updateInfos: $ReadOnlyArray, + +userIDs?: $ReadOnlyArray, }; diff --git a/lib/shared/farcaster/farcaster-hooks.js b/lib/shared/farcaster/farcaster-hooks.js --- a/lib/shared/farcaster/farcaster-hooks.js +++ b/lib/shared/farcaster/farcaster-hooks.js @@ -12,50 +12,154 @@ } from './farcaster-api.js'; import type { FarcasterConversation } from './farcaster-conversation-types.js'; import { - farcasterMessageValidator, type FarcasterMessage, + farcasterMessageValidator, } from './farcaster-messages-types.js'; -import { processNewUserIDsActionType } from '../../actions/user-actions.js'; import { useGetCommFCUsersForFIDs } from '../../hooks/user-identities-hooks.js'; +import type { RawMessageInfo } from '../../types/message-types.js'; import { messageTruncationStatus } from '../../types/message-types.js'; +import type { Dispatch } from '../../types/redux-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; +import type { ClientUpdateInfo } from '../../types/update-types.js'; import { extractFarcasterIDsFromPayload } from '../../utils/conversion-utils.js'; import { convertFarcasterMessageToCommMessages } from '../../utils/convert-farcaster-message-to-comm-messages.js'; import { createFarcasterRawThreadInfo } from '../../utils/create-farcaster-raw-thread-info.js'; import { useSetFarcasterDCsLoaded } from '../../utils/farcaster-utils.js'; import { useDispatch } from '../../utils/redux-utils.js'; +import sleep from '../../utils/sleep.js'; import { useSendDMOperationUtils } from '../dm-ops/dm-op-utils.js'; import { userIDFromFID } from '../id-utils.js'; -const FARCASTER_DATA_BATCH_SIZE = 3; +const FARCASTER_DATA_BATCH_SIZE = 20; +const MAX_RETRIES = 3; +const RETRY_DELAY_MS = 1000; + +type BatchedUpdates = { + +userIDs: Set, + +updateInfos: Array, + +messageInfos: Array, + +additionalMessageInfos: Array, +}; + +async function withRetry( + operation: () => Promise, + maxRetries: number = MAX_RETRIES, + delayMs: number = RETRY_DELAY_MS, +): Promise { + let lastError; + + for (let attempt = 1; attempt <= maxRetries + 1; attempt++) { + try { + return await operation(); + } catch (error) { + lastError = error; + + if (attempt <= maxRetries) { + const delay = delayMs * attempt; + await sleep(delay); + } + } + } + throw lastError; +} -async function processInBatches( +async function processInBatchesWithReduxBatching( items: $ReadOnlyArray, batchSize: number, - processor: (item: T) => Promise, + processor: (item: T, batchedUpdates: BatchedUpdates) => Promise, + dispatch: Dispatch, ): Promise> { const results: Array = []; for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize); - const batchResults = await Promise.all(batch.map(processor)); - results.push(...batchResults); + + const batchPromises = batch.map(async item => { + try { + const itemBatchedUpdates: BatchedUpdates = { + userIDs: new Set(), + updateInfos: ([]: Array), + messageInfos: ([]: Array), + additionalMessageInfos: ([]: Array), + }; + const result = await processor(item, itemBatchedUpdates); + return { result, updates: itemBatchedUpdates }; + } catch (error) { + console.log('Error processing item:', item, 'Error:', error); + return null; + } + }); + + const batchResults = await Promise.all(batchPromises); + + const allUserIDs = new Set(); + const allUpdateInfos: Array = []; + const allMessageInfos: Array = []; + const allAdditionalMessageInfos: Array = []; + const validResults: Array = []; + + for (const itemResult of batchResults) { + if (itemResult) { + validResults.push(itemResult.result); + itemResult.updates.userIDs.forEach(uid => allUserIDs.add(uid)); + allUpdateInfos.push(...itemResult.updates.updateInfos); + allMessageInfos.push(...itemResult.updates.messageInfos); + allAdditionalMessageInfos.push( + ...itemResult.updates.additionalMessageInfos, + ); + } + } + + results.push(...validResults); + + if ( + allUserIDs.size > 0 || + allUpdateInfos.length > 0 || + allMessageInfos.length > 0 || + allAdditionalMessageInfos.length > 0 + ) { + const payload = { + rawMessageInfos: allMessageInfos, + updateInfos: allUpdateInfos, + userIDs: allUserIDs.size > 0 ? Array.from(allUserIDs) : undefined, + additionalMessageInfos: + allAdditionalMessageInfos.length > 0 + ? allAdditionalMessageInfos + : undefined, + }; + + dispatch({ + type: processFarcasterOpsActionType, + payload, + }); + } + + await sleep(0); } + return results; } -function useFetchConversation(): ( +function useFetchConversationWithBatching(): ( conversationID: string, + batchedUpdates: BatchedUpdates, ) => Promise { const fetchUsersByFIDs = useGetCommFCUsersForFIDs(); const fetchFarcasterConversation = useFetchFarcasterConversation(); - const dispatch = useDispatch(); return React.useCallback( - async (conversationID: string): Promise => { + async ( + conversationID: string, + batchedUpdates: BatchedUpdates, + ): Promise => { try { - const conversationResult = await fetchFarcasterConversation({ - conversationId: conversationID, - }); + const conversationResult = await withRetry( + () => + fetchFarcasterConversation({ + conversationId: conversationID, + }), + MAX_RETRIES, + RETRY_DELAY_MS, + ); if (!conversationResult) { return null; @@ -77,14 +181,6 @@ members: threadMembers, }; - if (threadMembers.length > 0) { - const newUserIDs = threadMembers.map(member => member.id); - dispatch({ - type: processNewUserIDsActionType, - payload: { userIDs: newUserIDs }, - }); - } - const update = { type: updateTypes.JOIN_THREAD, id: uuid.v4(), @@ -95,13 +191,12 @@ rawEntryInfos: [], }; - dispatch({ - type: processFarcasterOpsActionType, - payload: { - rawMessageInfos: [], - updateInfos: [update], - }, - }); + if (threadMembers.length > 0) { + threadMembers.forEach(member => + batchedUpdates.userIDs.add(member.id), + ); + } + batchedUpdates.updateInfos.push(update); return farcasterConversation; } catch (e) { @@ -109,22 +204,23 @@ return null; } }, - [fetchFarcasterConversation, fetchUsersByFIDs, dispatch], + [fetchFarcasterConversation, fetchUsersByFIDs], ); } function useFetchMessagesForConversation(): ( conversationID: string, messagesNumberLimit?: number, + batchedUpdates: BatchedUpdates, ) => Promise { const fetchFarcasterMessages = useFetchFarcasterMessages(); const fetchUsersByFIDs = useGetCommFCUsersForFIDs(); - const dispatch = useDispatch(); return React.useCallback( async ( conversationID: string, messagesNumberLimit: number = 20, + batchedUpdates: BatchedUpdates, ): Promise => { try { let cursor: ?string = null; @@ -146,7 +242,11 @@ ...(cursor ? { cursor } : {}), }; - const messagesResult = await fetchFarcasterMessages(messagesInput); + const messagesResult = await withRetry( + () => fetchFarcasterMessages(messagesInput), + MAX_RETRIES, + RETRY_DELAY_MS, + ); if (messagesResult) { const farcasterMessages = messagesResult.result.messages; @@ -164,55 +264,78 @@ ); if (fcUserInfos.size > 0) { - const newUserIDs = Array.from(fcUserInfos.entries()).map( - ([fid, user]) => user?.userID ?? userIDFromFID(fid), + Array.from(fcUserInfos.entries()).forEach(([fid, user]) => + batchedUpdates.userIDs.add(user?.userID ?? userIDFromFID(fid)), ); - dispatch({ - type: processNewUserIDsActionType, - payload: { userIDs: newUserIDs }, - }); } - if (rawMessageInfos.length > 0) { - const payload = - totalMessagesFetched === 0 - ? { rawMessageInfos, updateInfos: [] } - : { - rawMessageInfos: [], - updateInfos: [], - additionalMessageInfos: rawMessageInfos, - }; - dispatch({ - type: processFarcasterOpsActionType, - payload, - }); - totalMessagesFetched += farcasterMessages.length; + if (totalMessagesFetched === 0) { + batchedUpdates.messageInfos.push(...rawMessageInfos); + } else { + batchedUpdates.additionalMessageInfos.push(...rawMessageInfos); + } } + totalMessagesFetched += farcasterMessages.length; cursor = messagesResult.next?.cursor; } else { cursor = null; } + await sleep(0); } while (cursor && totalMessagesFetched < messagesNumberLimit); } catch (e) { console.error(`Failed fetching messages for ${conversationID}:`, e); } }, - [fetchFarcasterMessages, fetchUsersByFIDs, dispatch], + [fetchFarcasterMessages, fetchUsersByFIDs], ); } function useRefreshFarcasterConversation(): ( conversationID: string, ) => Promise { - const fetchConversation = useFetchConversation(); + const fetchConversation = useFetchConversationWithBatching(); const fetchMessagesForConversation = useFetchMessagesForConversation(); + const dispatch = useDispatch(); + return React.useCallback( async (conversationID: string) => { - await fetchConversation(conversationID); - await fetchMessagesForConversation(conversationID); + const batchedUpdates: BatchedUpdates = { + userIDs: new Set(), + updateInfos: ([]: Array), + messageInfos: ([]: Array), + additionalMessageInfos: ([]: Array), + }; + + await fetchConversation(conversationID, batchedUpdates); + await fetchMessagesForConversation(conversationID, 20, batchedUpdates); + + if ( + batchedUpdates.userIDs.size > 0 || + batchedUpdates.updateInfos.length > 0 || + batchedUpdates.messageInfos.length > 0 || + batchedUpdates.additionalMessageInfos.length > 0 + ) { + const payload = { + rawMessageInfos: batchedUpdates.messageInfos, + updateInfos: batchedUpdates.updateInfos, + userIDs: + batchedUpdates.userIDs.size > 0 + ? Array.from(batchedUpdates.userIDs) + : undefined, + additionalMessageInfos: + batchedUpdates.additionalMessageInfos.length > 0 + ? batchedUpdates.additionalMessageInfos + : undefined, + }; + + dispatch({ + type: processFarcasterOpsActionType, + payload, + }); + } }, - [fetchConversation, fetchMessagesForConversation], + [fetchConversation, fetchMessagesForConversation, dispatch], ); } @@ -227,7 +350,7 @@ const sendFarcasterTextMessage = useSendFarcasterTextMessage(); const dispatch = useDispatch(); const utils = useSendDMOperationUtils(); - const fetchConversation = useFetchConversation(); + const fetchConversation = useFetchConversationWithBatching(); const fetchMessagesForConversation = useFetchMessagesForConversation(); const fetchInboxes: (cursor: ?string) => Promise = React.useCallback( @@ -240,7 +363,11 @@ cursor, }; } - const { result, next } = await fetchFarcasterInbox(input); + const { result, next } = await withRetry( + () => fetchFarcasterInbox(input), + MAX_RETRIES, + RETRY_DELAY_MS, + ); setConversations(prev => { const ids = result.conversations.map( conversation => conversation.conversationId, @@ -271,22 +398,27 @@ void (async () => { const farcasterConversations: Array = []; - const conversationResults = await processInBatches( + const conversationResults = await processInBatchesWithReduxBatching( conversations, FARCASTER_DATA_BATCH_SIZE, - fetchConversation, + (conversationID, batchedUpdates) => + fetchConversation(conversationID, batchedUpdates), + dispatch, ); - farcasterConversations.push(...conversationResults.filter(Boolean)); + const successfulConversations = conversationResults.filter(Boolean); + farcasterConversations.push(...successfulConversations); - await processInBatches( + await processInBatchesWithReduxBatching( farcasterConversations, FARCASTER_DATA_BATCH_SIZE, - conversation => + (conversation, batchedUpdates) => fetchMessagesForConversation( conversation.conversationId, messagesNumberLimit, + batchedUpdates, ), + dispatch, ); setConversations([]); @@ -332,8 +464,51 @@ ); } +function useFetchConversation(): ( + conversationID: string, +) => Promise { + const fetchConversation = useFetchConversationWithBatching(); + const dispatch = useDispatch(); + + return React.useCallback( + async (conversationID: string): Promise => { + const batchedUpdates: BatchedUpdates = { + userIDs: new Set(), + updateInfos: ([]: Array), + messageInfos: ([]: Array), + additionalMessageInfos: ([]: Array), + }; + + const result = await fetchConversation(conversationID, batchedUpdates); + + if ( + batchedUpdates.userIDs.size > 0 || + batchedUpdates.updateInfos.length > 0 + ) { + const payload = { + rawMessageInfos: [], + updateInfos: batchedUpdates.updateInfos, + userIDs: + batchedUpdates.userIDs.size > 0 + ? Array.from(batchedUpdates.userIDs) + : undefined, + }; + + dispatch({ + type: processFarcasterOpsActionType, + payload, + }); + } + + return result; + }, + [fetchConversation, dispatch], + ); +} + export { useFarcasterConversationsSync, + useFetchConversationWithBatching, useFetchConversation, useFetchMessagesForConversation, useRefreshFarcasterConversation,