diff --git a/lib/reducers/master-reducer.js b/lib/reducers/master-reducer.js index 76d2a386a..210606d7f 100644 --- a/lib/reducers/master-reducer.js +++ b/lib/reducers/master-reducer.js @@ -1,260 +1,266 @@ // @flow import { reduceAlertStore } from './alert-reducer.js'; import { reduceAuxUserStore } from './aux-user-reducer.js'; import reduceCalendarFilters from './calendar-filters-reducer.js'; import { reduceCommunityStore } from './community-reducer.js'; import reduceCustomerServer from './custom-server-reducer.js'; import reduceDataLoaded from './data-loaded-reducer.js'; import { reduceDBOpsStore } from './db-ops-reducer.js'; import { reduceDMOperationsQueue } from './dm-operations-queue-reducer.js'; import { reduceDraftStore } from './draft-reducer.js'; import reduceEnabledApps from './enabled-apps-reducer.js'; import { reduceEntryInfos } from './entry-reducer.js'; import { reduceHolderStore } from './holder-reducer.js'; import { reduceIntegrityStore } from './integrity-reducer.js'; import reduceInviteLinks from './invite-links-reducer.js'; import reduceKeyserverStore from './keyserver-reducer.js'; import reduceLifecycleState from './lifecycle-state-reducer.js'; import { reduceLoadingStatuses } from './loading-reducer.js'; import { reduceMessageStore } from './message-reducer.js'; import reduceBaseNavInfo from './nav-reducer.js'; import policiesReducer from './policies-reducer.js'; import reduceReportStore from './report-store-reducer.js'; import { reduceSyncedMetadataStore } from './synced-metadata-reducer.js'; import reduceGlobalThemeInfo from './theme-reducer.js'; import { reduceThreadActivity } from './thread-activity-reducer.js'; import { reduceThreadInfos } from './thread-reducer.js'; import reduceTunnelbrokerDeviceToken from './tunnelbroker-device-token-reducer.js'; import { reduceCurrentUserInfo, reduceUserInfos } from './user-reducer.js'; import { addKeyserverActionType } from '../actions/keyserver-actions.js'; import { legacySiweAuthActionTypes } from '../actions/siwe-actions.js'; import { fetchPendingUpdatesActionTypes } from '../actions/update-actions.js'; import { legacyKeyserverRegisterActionTypes, legacyLogInActionTypes, keyserverAuthActionTypes, } from '../actions/user-actions.js'; import { keyserverStoreOpsHandlers, type ReplaceKeyserverOperation, } from '../ops/keyserver-store-ops.js'; import { isStaff } from '../shared/staff-utils.js'; import { processDMOpsActionType } from '../types/dm-ops.js'; import type { BaseNavInfo } from '../types/nav-types.js'; import type { BaseAppState, BaseAction } from '../types/redux-types.js'; import { fullStateSyncActionType, incrementalStateSyncActionType, } from '../types/socket-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; import { isDev } from '../utils/dev-utils.js'; export default function baseReducer>( state: T, action: BaseAction, onStateDifference: (message: string) => mixed, ): { state: T, storeOperations: StoreOperations } { + const currentUserInfo = reduceCurrentUserInfo(state.currentUserInfo, action); + const viewerID = + currentUserInfo && !currentUserInfo.anonymous + ? state.currentUserInfo?.id + : null; + const { threadStore, newThreadInconsistencies, threadStoreOperations } = - reduceThreadInfos(state.threadStore, action); + reduceThreadInfos(state.threadStore, action, viewerID); const { threadInfos } = threadStore; const { entryStore, reportCreationRequests: newEntryInconsistencies, entryStoreOperations, } = reduceEntryInfos(state.entryStore, action, threadInfos); const onStateDifferenceForStaff = (message: string) => { const isCurrentUserStaff = state.currentUserInfo?.id ? isStaff(state.currentUserInfo.id) : false; if (isCurrentUserStaff || isDev) { onStateDifference(message); } }; const [userStore, newUserInconsistencies, userStoreOperations] = reduceUserInfos(state.userStore, action); const newInconsistencies = [ ...newEntryInconsistencies, ...newThreadInconsistencies, ...newUserInconsistencies, ]; // Only allow checkpoints to increase if we are connected // or if the action is a STATE_SYNC const { messageStoreOperations, messageStore: reducedMessageStore } = reduceMessageStore(state.messageStore, action, threadInfos); let messageStore = reducedMessageStore; let { keyserverStore, keyserverStoreOperations } = reduceKeyserverStore( state.keyserverStore, action, onStateDifferenceForStaff, ); if ( action.type !== incrementalStateSyncActionType && action.type !== fullStateSyncActionType && action.type !== fetchPendingUpdatesActionTypes.success && action.type !== legacyKeyserverRegisterActionTypes.success && action.type !== legacyLogInActionTypes.success && action.type !== legacySiweAuthActionTypes.success && action.type !== keyserverAuthActionTypes.success && action.type !== addKeyserverActionType ) { const replaceOperations: ReplaceKeyserverOperation[] = []; for (const keyserverID in keyserverStore.keyserverInfos) { if ( keyserverStore.keyserverInfos[keyserverID].connection.status === 'connected' ) { continue; } if ( messageStore.currentAsOf[keyserverID] !== state.messageStore.currentAsOf[keyserverID] ) { messageStore = { ...messageStore, currentAsOf: { ...messageStore.currentAsOf, [keyserverID]: state.messageStore.currentAsOf[keyserverID], }, }; } if ( state.keyserverStore.keyserverInfos[keyserverID] && keyserverStore.keyserverInfos[keyserverID].updatesCurrentAsOf !== state.keyserverStore.keyserverInfos[keyserverID].updatesCurrentAsOf ) { replaceOperations.push({ type: 'replace_keyserver', payload: { id: keyserverID, keyserverInfo: { ...keyserverStore.keyserverInfos[keyserverID], updatesCurrentAsOf: state.keyserverStore.keyserverInfos[keyserverID] .updatesCurrentAsOf, }, }, }); } } keyserverStore = keyserverStoreOpsHandlers.processStoreOperations( keyserverStore, replaceOperations, ); keyserverStoreOperations = [ ...keyserverStoreOperations, ...replaceOperations, ]; } const { draftStore, draftStoreOperations } = reduceDraftStore( state.draftStore, action, ); const { reportStore, reportStoreOperations } = reduceReportStore( state.reportStore, action, newInconsistencies, ); const { communityStore, communityStoreOperations } = reduceCommunityStore( state.communityStore, action, ); const { integrityStore, integrityStoreOperations } = reduceIntegrityStore( state.integrityStore, action, threadInfos, threadStoreOperations, ); const { syncedMetadataStore, syncedMetadataStoreOperations } = reduceSyncedMetadataStore(state.syncedMetadataStore, action); const { auxUserStore, auxUserStoreOperations } = reduceAuxUserStore( state.auxUserStore, action, ); const { threadActivityStore, threadActivityStoreOperations } = reduceThreadActivity(state.threadActivityStore, action); let storeOperations = { draftStoreOperations, threadStoreOperations, messageStoreOperations, reportStoreOperations, userStoreOperations, keyserverStoreOperations, communityStoreOperations, integrityStoreOperations, syncedMetadataStoreOperations, auxUserStoreOperations, threadActivityStoreOperations, entryStoreOperations, }; if ( action.type === processDMOpsActionType && action.payload.outboundP2PMessages ) { storeOperations = { ...storeOperations, outboundP2PMessages: action.payload.outboundP2PMessages, }; } return { state: { ...state, navInfo: reduceBaseNavInfo(state.navInfo, action), draftStore, entryStore, loadingStatuses: reduceLoadingStatuses(state.loadingStatuses, action), - currentUserInfo: reduceCurrentUserInfo(state.currentUserInfo, action), + currentUserInfo, threadStore, userStore, messageStore, calendarFilters: reduceCalendarFilters( state.calendarFilters, action, threadStore, ), alertStore: reduceAlertStore(state.alertStore, action), lifecycleState: reduceLifecycleState(state.lifecycleState, action), enabledApps: reduceEnabledApps(state.enabledApps, action), reportStore, dataLoaded: reduceDataLoaded(state.dataLoaded, action), userPolicies: policiesReducer(state.userPolicies, action), inviteLinksStore: reduceInviteLinks(state.inviteLinksStore, action), keyserverStore, integrityStore, globalThemeInfo: reduceGlobalThemeInfo(state.globalThemeInfo, action), customServer: reduceCustomerServer(state.customServer, action), communityStore, dbOpsStore: reduceDBOpsStore(state.dbOpsStore, action), syncedMetadataStore, auxUserStore, threadActivityStore, tunnelbrokerDeviceToken: reduceTunnelbrokerDeviceToken( state.tunnelbrokerDeviceToken, action, ), queuedDMOperations: reduceDMOperationsQueue( state.queuedDMOperations, action, ), holderStore: reduceHolderStore(state.holderStore, action), }, storeOperations, }; } diff --git a/lib/reducers/message-reducer.js b/lib/reducers/message-reducer.js index 699549426..2fadf2951 100644 --- a/lib/reducers/message-reducer.js +++ b/lib/reducers/message-reducer.js @@ -1,1981 +1,1994 @@ // @flow import invariant from 'invariant'; import _difference from 'lodash/fp/difference.js'; import _flow from 'lodash/fp/flow.js'; import _isEqual from 'lodash/fp/isEqual.js'; import _keyBy from 'lodash/fp/keyBy.js'; import _map from 'lodash/fp/map.js'; import _mapKeys from 'lodash/fp/mapKeys.js'; import _mapValues from 'lodash/fp/mapValues.js'; import _omit from 'lodash/fp/omit.js'; import _omitBy from 'lodash/fp/omitBy.js'; import _pick from 'lodash/fp/pick.js'; import _pickBy from 'lodash/fp/pickBy.js'; import _uniq from 'lodash/fp/uniq.js'; import { setClientDBStoreActionType } from '../actions/client-db-store-actions.js'; import { createEntryActionTypes, saveEntryActionTypes, deleteEntryActionTypes, restoreEntryActionTypes, } from '../actions/entry-actions.js'; import { toggleMessagePinActionTypes, fetchMessagesBeforeCursorActionTypes, fetchMostRecentMessagesActionTypes, sendTextMessageActionTypes, sendMultimediaMessageActionTypes, sendReactionMessageActionTypes, sendEditMessageActionTypes, saveMessagesActionType, processMessagesActionType, messageStorePruneActionType, createLocalMessageActionType, fetchSingleMostRecentMessagesFromThreadsActionTypes, } from '../actions/message-actions.js'; import { sendMessageReportActionTypes } from '../actions/message-report-actions.js'; import { legacySiweAuthActionTypes } from '../actions/siwe-actions.js'; import { changeThreadSettingsActionTypes, deleteThreadActionTypes, leaveThreadActionTypes, newThreadActionTypes, removeUsersFromThreadActionTypes, changeThreadMemberRolesActionTypes, joinThreadActionTypes, } from '../actions/thread-actions.js'; import { fetchPendingUpdatesActionTypes } from '../actions/update-actions.js'; import { updateMultimediaMessageMediaActionType } from '../actions/upload-actions.js'; import { keyserverAuthActionTypes, deleteKeyserverAccountActionTypes, legacyLogInActionTypes, legacyKeyserverRegisterActionTypes, } from '../actions/user-actions.js'; import { setNewSessionActionType } from '../keyserver-conn/keyserver-conn-types.js'; import { messageStoreOpsHandlers, type MessageStoreOperation, type ReplaceMessageOperation, type ReplaceMessageStoreLocalMessageInfoOperation, } from '../ops/message-store-ops.js'; import { pendingToRealizedThreadIDsSelector } from '../selectors/thread-selectors.js'; import { messageID, sortMessageInfoList, sortMessageIDs, mergeThreadMessageInfos, findNewestMessageTimePerKeyserver, localIDPrefix, } from '../shared/message-utils.js'; import { parsePendingThreadID, threadHasPermission, threadInChatList, threadIsPending, } from '../shared/thread-utils.js'; import threadWatcher from '../shared/thread-watcher.js'; import { unshimMessageInfos } from '../shared/unshim-utils.js'; import { updateSpecs } from '../shared/updates/update-specs.js'; import { recoveryFromReduxActionSources } from '../types/account-types.js'; import { processDMOpsActionType } from '../types/dm-ops.js'; import type { Media, Image } from '../types/media-types.js'; import { messageTypes } from '../types/message-types-enum.js'; import { type RawMessageInfo, type LocalMessageInfo, type MessageStore, type MessageTruncationStatus, type MessageTruncationStatuses, messageTruncationStatus, defaultNumberPerThread, type ThreadMessageInfo, } from '../types/message-types.js'; import type { RawImagesMessageInfo } from '../types/messages/images.js'; import type { RawMediaMessageInfo } from '../types/messages/media.js'; import type { RawThreadInfo } from '../types/minimally-encoded-thread-permissions-types.js'; import { type BaseAction } from '../types/redux-types.js'; import { processServerRequestsActionType } from '../types/request-types.js'; import { fullStateSyncActionType, incrementalStateSyncActionType, stateSyncPayloadTypes, } from '../types/socket-types.js'; import { threadPermissions } from '../types/thread-permission-types.js'; import { threadTypeIsThick } from '../types/thread-types-enum.js'; import type { LegacyRawThreadInfo, RawThreadInfos, MixedRawThreadInfos, } from '../types/thread-types.js'; import { type ClientUpdateInfo, processUpdatesActionType, } from '../types/update-types.js'; import { translateClientDBThreadMessageInfos } from '../utils/message-ops-utils.js'; const _mapValuesWithKeys = _mapValues.convert({ cap: false }); // Input must already be ordered! function mapThreadsToMessageIDsFromOrderedMessageInfos( orderedMessageInfos: $ReadOnlyArray, ): { [threadID: string]: string[] } { const threadsToMessageIDs: { [threadID: string]: string[] } = {}; for (const messageInfo of orderedMessageInfos) { const key = messageID(messageInfo); if (!threadsToMessageIDs[messageInfo.threadID]) { threadsToMessageIDs[messageInfo.threadID] = [key]; } else { threadsToMessageIDs[messageInfo.threadID].push(key); } } return threadsToMessageIDs; } function isThreadWatched( threadID: string, threadInfo: ?LegacyRawThreadInfo | ?RawThreadInfo, watchedIDs: $ReadOnlyArray, ) { return ( threadIsPending(threadID) || (threadInfo && threadHasPermission(threadInfo, threadPermissions.VISIBLE) && (threadInChatList(threadInfo) || watchedIDs.includes(threadID) || threadTypeIsThick(threadInfo.type))) ); } const newThread = (): ThreadMessageInfo => ({ messageIDs: [], startReached: false, }); type FreshMessageStoreResult = { +messageStoreOperations: $ReadOnlyArray, +messageStore: MessageStore, }; function freshMessageStore( messageInfos: $ReadOnlyArray, truncationStatus: { [threadID: string]: MessageTruncationStatus }, currentAsOf: { +[keyserverID: string]: number }, threadInfos: MixedRawThreadInfos, ): FreshMessageStoreResult { const unshimmed = unshimMessageInfos(messageInfos); const orderedMessageInfos = sortMessageInfoList(unshimmed); const messages = _keyBy(messageID)(orderedMessageInfos); const messageStoreReplaceOperations = orderedMessageInfos.map( messageInfo => ({ type: 'replace', payload: { id: messageID(messageInfo), messageInfo }, }), ); const threadsToMessageIDs = mapThreadsToMessageIDsFromOrderedMessageInfos(orderedMessageInfos); const threads = _mapValuesWithKeys( (messageIDs: string[], threadID: string) => ({ ...newThread(), messageIDs, startReached: truncationStatus[threadID] === messageTruncationStatus.EXHAUSTIVE, }), )(threadsToMessageIDs); const watchedIDs = threadWatcher.getWatchedIDs(); for (const threadID in threadInfos) { const threadInfo = threadInfos[threadID]; if ( threads[threadID] || !isThreadWatched(threadID, threadInfo, watchedIDs) ) { continue; } threads[threadID] = newThread(); } const messageStoreOperations = [ { type: 'remove_all' }, { type: 'remove_all_threads', }, { type: 'remove_all_local_message_infos', }, { type: 'replace_threads', payload: { threads }, }, ...messageStoreReplaceOperations, ]; return { messageStoreOperations, messageStore: { messages, threads, local: {}, currentAsOf, }, }; } type ReassignmentResult = { +messageStoreOperations: MessageStoreOperation[], +messageStore: MessageStore, +reassignedThreadIDs: string[], }; function reassignMessagesToRealizedThreads( messageStore: MessageStore, threadInfos: RawThreadInfos, ): ReassignmentResult { const pendingToRealizedThreadIDs = pendingToRealizedThreadIDsSelector(threadInfos); const messageStoreOperations: MessageStoreOperation[] = []; const messages: { [string]: RawMessageInfo } = {}; for (const storeMessageID in messageStore.messages) { const message = messageStore.messages[storeMessageID]; const newThreadID = pendingToRealizedThreadIDs.get(message.threadID); messages[storeMessageID] = newThreadID ? { ...message, threadID: newThreadID, time: threadInfos[newThreadID]?.creationTime ?? message.time, } : message; if (!newThreadID) { continue; } const updateMsgOperation: ReplaceMessageOperation = { type: 'replace', payload: { id: storeMessageID, messageInfo: messages[storeMessageID] }, }; messageStoreOperations.push(updateMsgOperation); } const threads: { [string]: ThreadMessageInfo } = {}; const reassignedThreadIDs = []; const updatedThreads: { [string]: ThreadMessageInfo } = {}; const threadsToRemove = []; for (const threadID in messageStore.threads) { const threadMessageInfo = messageStore.threads[threadID]; const newThreadID = pendingToRealizedThreadIDs.get(threadID); if (!newThreadID) { threads[threadID] = threadMessageInfo; continue; } const realizedThread = messageStore.threads[newThreadID]; if (!realizedThread) { reassignedThreadIDs.push(newThreadID); threads[newThreadID] = threadMessageInfo; updatedThreads[newThreadID] = threadMessageInfo; threadsToRemove.push(threadID); continue; } threads[newThreadID] = mergeThreadMessageInfos( threadMessageInfo, realizedThread, messages, ); updatedThreads[newThreadID] = threads[newThreadID]; } if (threadsToRemove.length) { messageStoreOperations.push({ type: 'remove_threads', payload: { ids: threadsToRemove, }, }); } messageStoreOperations.push({ type: 'replace_threads', payload: { threads: updatedThreads, }, }); return { messageStoreOperations, messageStore: { ...messageStore, threads, messages, }, reassignedThreadIDs, }; } type MergeNewMessagesResult = { +messageStoreOperations: $ReadOnlyArray, +messageStore: MessageStore, }; // oldMessageStore is from the old state // newMessageInfos, truncationStatus come from server function mergeNewMessages( oldMessageStore: MessageStore, newMessageInfos: $ReadOnlyArray, truncationStatus: { +[threadID: string]: MessageTruncationStatus }, threadInfos: RawThreadInfos, ): MergeNewMessagesResult { const { messageStoreOperations: updateWithLatestThreadInfosOps, messageStore: messageStoreUpdatedWithLatestThreadInfos, reassignedThreadIDs, } = updateMessageStoreWithLatestThreadInfos(oldMessageStore, threadInfos); const messageStoreAfterUpdateOps = processMessageStoreOperations( oldMessageStore, updateWithLatestThreadInfosOps, ); const updatedMessageStore = { ...messageStoreUpdatedWithLatestThreadInfos, messages: messageStoreAfterUpdateOps.messages, threads: messageStoreAfterUpdateOps.threads, local: messageStoreAfterUpdateOps.local, }; const localIDsToServerIDs: Map = new Map(); const watchedThreadIDs = [ ...threadWatcher.getWatchedIDs(), ...reassignedThreadIDs, ]; const unshimmedNewMessages = unshimMessageInfos(newMessageInfos); const unshimmedNewMessagesOfWatchedThreads = unshimmedNewMessages.filter( msg => isThreadWatched( msg.threadID, threadInfos[msg.threadID], watchedThreadIDs, ), ); const orderedNewMessageInfos = _flow( _map((messageInfo: RawMessageInfo) => { const { id: inputID } = messageInfo; invariant(inputID, 'new messageInfos should have serverID'); invariant( !threadIsPending(messageInfo.threadID), 'new messageInfos should have realized thread id', ); const currentMessageInfo = updatedMessageStore.messages[inputID]; if ( messageInfo.type === messageTypes.TEXT || messageInfo.type === messageTypes.IMAGES || messageInfo.type === messageTypes.MULTIMEDIA ) { const { localID: inputLocalID } = messageInfo; const currentLocalMessageInfo = inputLocalID ? updatedMessageStore.messages[inputLocalID] : null; if (currentMessageInfo && currentMessageInfo.localID) { // If the client already has a RawMessageInfo with this serverID, keep // any localID associated with the existing one. This is because we // use localIDs as React keys and changing React keys leads to loss of // component state. (The conditional below is for Flow) if (messageInfo.type === messageTypes.TEXT) { messageInfo = { ...messageInfo, localID: currentMessageInfo.localID, }; } else if (messageInfo.type === messageTypes.MULTIMEDIA) { messageInfo = ({ ...messageInfo, localID: currentMessageInfo.localID, }: RawMediaMessageInfo); } else { messageInfo = ({ ...messageInfo, localID: currentMessageInfo.localID, }: RawImagesMessageInfo); } } else if (currentLocalMessageInfo && currentLocalMessageInfo.localID) { // If the client has a RawMessageInfo with this localID, but not with // the serverID, that means the message creation succeeded but the // success action never got processed. We set a key in // localIDsToServerIDs here to fix the messageIDs for the rest of the // MessageStore too. (The conditional below is for Flow) invariant(inputLocalID, 'inputLocalID should be set'); localIDsToServerIDs.set(inputLocalID, inputID); if (messageInfo.type === messageTypes.TEXT) { messageInfo = { ...messageInfo, localID: currentLocalMessageInfo.localID, }; } else if (messageInfo.type === messageTypes.MULTIMEDIA) { messageInfo = ({ ...messageInfo, localID: currentLocalMessageInfo.localID, }: RawMediaMessageInfo); } else { messageInfo = ({ ...messageInfo, localID: currentLocalMessageInfo.localID, }: RawImagesMessageInfo); } } else { // If neither the serverID nor the localID from the delivered // RawMessageInfo exists in the client store, then this message is // brand new to us. Ignore any localID provided by the server. // (The conditional below is for Flow) const { localID, ...rest } = messageInfo; if (rest.type === messageTypes.TEXT) { messageInfo = { ...rest }; } else if (rest.type === messageTypes.MULTIMEDIA) { messageInfo = ({ ...rest }: RawMediaMessageInfo); } else { messageInfo = ({ ...rest }: RawImagesMessageInfo); } } } else if ( currentMessageInfo && messageInfo.time > currentMessageInfo.time ) { // When thick threads will be introduced it will be possible for two // clients to create the same message (e.g. when they create the same // sidebar at the same time). We're going to use deterministic ids for // messages which should be unique within a thread and we have to find // a way for clients to agree which message to keep. We can't rely on // always choosing incoming messages nor messages from the store, // because a message that is in one user's store, will be send to // another user. One way to deal with it is to always choose a message // which is older, according to its timestamp. We can use this strategy // only for messages that can start a thread, because for other types // it might break the "contiguous" property of message ids (we can // consider selecting younger messages in that case, but for now we use // an invariant). invariant( messageInfo.type === messageTypes.CREATE_SIDEBAR || messageInfo.type === messageTypes.CREATE_THREAD || messageInfo.type === messageTypes.SIDEBAR_SOURCE, `Two different messages of type ${messageInfo.type} with the same ` + 'id found', ); return currentMessageInfo; } return _isEqual(messageInfo)(currentMessageInfo) ? currentMessageInfo : messageInfo; }), sortMessageInfoList, )(unshimmedNewMessagesOfWatchedThreads); const newMessageOps: MessageStoreOperation[] = []; const threadsToMessageIDs = mapThreadsToMessageIDsFromOrderedMessageInfos( orderedNewMessageInfos, ); const oldMessageInfosToCombine = []; const threadsThatNeedMessageIDsResorted = []; const updatedThreads: { [string]: ThreadMessageInfo } = {}; const threads = _flow( _mapValuesWithKeys((messageIDs: string[], threadID: string) => { const oldThread = updatedMessageStore.threads[threadID]; const truncate = truncationStatus[threadID]; if (!oldThread) { updatedThreads[threadID] = { ...newThread(), messageIDs, startReached: truncate === messageTruncationStatus.EXHAUSTIVE, }; return updatedThreads[threadID]; } let oldMessageIDsUnchanged = true; const oldMessageIDs = oldThread.messageIDs.map(oldID => { const newID = localIDsToServerIDs.get(oldID); if (newID !== null && newID !== undefined) { oldMessageIDsUnchanged = false; return newID; } return oldID; }); if (truncate === messageTruncationStatus.TRUNCATED) { // If the result set in the payload isn't contiguous with what we have // now, that means we need to dump what we have in the state and replace // it with the result set. We do this to achieve our two goals for the // messageStore: currentness and contiguousness. newMessageOps.push({ type: 'remove_messages_for_threads', payload: { threadIDs: [threadID] }, }); updatedThreads[threadID] = { messageIDs, startReached: false, }; return updatedThreads[threadID]; } const oldNotInNew = _difference(oldMessageIDs)(messageIDs); for (const id of oldNotInNew) { const oldMessageInfo = updatedMessageStore.messages[id]; invariant(oldMessageInfo, `could not find ${id} in messageStore`); oldMessageInfosToCombine.push(oldMessageInfo); const localInfo = updatedMessageStore.local[id]; if (localInfo) { newMessageOps.push({ type: 'replace_local_message_info', payload: { id, localMessageInfo: localInfo, }, }); } } const startReached = oldThread.startReached || truncate === messageTruncationStatus.EXHAUSTIVE; if (_difference(messageIDs)(oldMessageIDs).length === 0) { if (startReached === oldThread.startReached && oldMessageIDsUnchanged) { return oldThread; } updatedThreads[threadID] = { messageIDs: oldMessageIDs, startReached, }; return updatedThreads[threadID]; } const mergedMessageIDs = [...messageIDs, ...oldNotInNew]; threadsThatNeedMessageIDsResorted.push(threadID); return { messageIDs: mergedMessageIDs, startReached, }; }), _pickBy(thread => !!thread), )(threadsToMessageIDs); for (const threadID in updatedMessageStore.threads) { if (threads[threadID]) { continue; } let thread = updatedMessageStore.threads[threadID]; const truncate = truncationStatus[threadID]; if (truncate === messageTruncationStatus.EXHAUSTIVE) { thread = { ...thread, startReached: true, }; } threads[threadID] = thread; updatedThreads[threadID] = thread; for (const id of thread.messageIDs) { const messageInfo = updatedMessageStore.messages[id]; if (messageInfo) { oldMessageInfosToCombine.push(messageInfo); } const localInfo = updatedMessageStore.local[id]; if (localInfo) { newMessageOps.push({ type: 'replace_local_message_info', payload: { id, localMessageInfo: localInfo, }, }); } } } const messages = _flow( sortMessageInfoList, _keyBy(messageID), )([...orderedNewMessageInfos, ...oldMessageInfosToCombine]); const newMessages = _keyBy(messageID)(orderedNewMessageInfos); for (const id in newMessages) { newMessageOps.push({ type: 'replace', payload: { id, messageInfo: newMessages[id] }, }); } if (localIDsToServerIDs.size > 0) { newMessageOps.push({ type: 'remove', payload: { ids: [...localIDsToServerIDs.keys()] }, }); } for (const threadID of threadsThatNeedMessageIDsResorted) { threads[threadID].messageIDs = sortMessageIDs(messages)( threads[threadID].messageIDs, ); updatedThreads[threadID] = threads[threadID]; } // We don't want to persist an information about startReached for thick // thread containing more than defaultNumberPerThread messages. The reason // is that after closing and reopening the app, we're fetching only a subset // of messages. If we read startReached = true from the DB, we won't attempt // fetching more batches of messages. const dbOps = [ ...newMessageOps, { type: 'replace_threads', payload: { threads: Object.fromEntries( Object.entries(updatedThreads).map(([threadID, thread]) => { let isThreadThick; if (threadInfos[threadID]) { isThreadThick = threadInfos[threadID].thick; } else { const threadIDParseResult = parsePendingThreadID(threadID); isThreadThick = threadIDParseResult ? threadTypeIsThick(threadIDParseResult.threadType) : false; } return [ threadID, isThreadThick ? { ...thread, startReached: thread.messageIDs.length < defaultNumberPerThread, } : thread, ]; }), ), }, }, ]; newMessageOps.push({ type: 'replace_threads', payload: { threads: updatedThreads, }, }); const processedMessageStore = processMessageStoreOperations( updatedMessageStore, newMessageOps, ); const currentAsOf: { [keyserverID: string]: number } = {}; const newestMessageTimePerKeyserver = findNewestMessageTimePerKeyserver( orderedNewMessageInfos, ); for (const keyserverID in newestMessageTimePerKeyserver) { currentAsOf[keyserverID] = Math.max( newestMessageTimePerKeyserver[keyserverID], processedMessageStore.currentAsOf[keyserverID] ?? 0, ); } const messageStore = { messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, currentAsOf: { ...processedMessageStore.currentAsOf, ...currentAsOf, }, }; return { messageStoreOperations: [...updateWithLatestThreadInfosOps, ...dbOps], messageStore, }; } type UpdateMessageStoreWithLatestThreadInfosResult = { +messageStoreOperations: MessageStoreOperation[], +messageStore: MessageStore, +reassignedThreadIDs: string[], }; function updateMessageStoreWithLatestThreadInfos( messageStore: MessageStore, threadInfos: RawThreadInfos, ): UpdateMessageStoreWithLatestThreadInfosResult { const messageStoreOperations: MessageStoreOperation[] = []; const { messageStore: reassignedMessageStore, messageStoreOperations: reassignMessagesOps, reassignedThreadIDs, } = reassignMessagesToRealizedThreads(messageStore, threadInfos); messageStoreOperations.push(...reassignMessagesOps); const watchedIDs = [...threadWatcher.getWatchedIDs(), ...reassignedThreadIDs]; const filteredThreads: { [string]: ThreadMessageInfo } = {}; const threadsToRemoveMessagesFrom = []; const messageIDsToRemove: string[] = []; for (const threadID in reassignedMessageStore.threads) { const threadMessageInfo = reassignedMessageStore.threads[threadID]; const threadInfo = threadInfos[threadID]; if (isThreadWatched(threadID, threadInfo, watchedIDs)) { filteredThreads[threadID] = threadMessageInfo; } else { threadsToRemoveMessagesFrom.push(threadID); messageIDsToRemove.push(...threadMessageInfo.messageIDs); } } const updatedThreads: { [string]: ThreadMessageInfo } = {}; for (const threadID in threadInfos) { const threadInfo = threadInfos[threadID]; if ( isThreadWatched(threadID, threadInfo, watchedIDs) && !filteredThreads[threadID] ) { filteredThreads[threadID] = newThread(); updatedThreads[threadID] = filteredThreads[threadID]; } } messageStoreOperations.push({ type: 'remove_threads', payload: { ids: threadsToRemoveMessagesFrom }, }); messageStoreOperations.push({ type: 'replace_threads', payload: { threads: updatedThreads, }, }); messageStoreOperations.push({ type: 'remove_messages_for_threads', payload: { threadIDs: threadsToRemoveMessagesFrom }, }); const localMessagesToRemove = _pick(messageIDsToRemove)( reassignedMessageStore.local, ); messageStoreOperations.push({ type: 'remove_local_message_infos', payload: { ids: Object.keys(localMessagesToRemove) }, }); return { messageStoreOperations, messageStore: { messages: _omit(messageIDsToRemove)(reassignedMessageStore.messages), threads: filteredThreads, local: _omit(messageIDsToRemove)(reassignedMessageStore.local), currentAsOf: reassignedMessageStore.currentAsOf, }, reassignedThreadIDs, }; } function ensureRealizedThreadIDIsUsedWhenPossible( payload: T, threadInfos: RawThreadInfos, ): T { const pendingToRealizedThreadIDs = pendingToRealizedThreadIDsSelector(threadInfos); const realizedThreadID = pendingToRealizedThreadIDs.get(payload.threadID); return realizedThreadID ? { ...payload, threadID: realizedThreadID } : payload; } const { processStoreOperations: processMessageStoreOperations } = messageStoreOpsHandlers; type ReduceMessageStoreResult = { +messageStoreOperations: $ReadOnlyArray, +messageStore: MessageStore, }; function reduceMessageStore( messageStore: MessageStore, action: BaseAction, newThreadInfos: RawThreadInfos, ): ReduceMessageStoreResult { if ( action.type === legacyLogInActionTypes.success || action.type === legacySiweAuthActionTypes.success ) { const { messagesResult } = action.payload; let { messageInfos } = messagesResult; // If it's a resolution attempt and the userID doesn't change, // then we should keep all local messages in the store // TODO we can't check if the userID changed until ENG-6126 if ( action.payload.authActionSource === recoveryFromReduxActionSources.cookieInvalidationResolutionAttempt || action.payload.authActionSource === recoveryFromReduxActionSources.socketAuthErrorResolutionAttempt ) { const localMessages = Object.values(messageStore.messages).filter( rawMessageInfo => messageID(rawMessageInfo).startsWith(localIDPrefix), ); messageInfos = [...messageInfos, ...localMessages]; } const { messageStoreOperations, messageStore: freshStore } = freshMessageStore( messageInfos, messagesResult.truncationStatus, messagesResult.currentAsOf, newThreadInfos, ); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { ...freshStore, messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, }, }; } else if (action.type === keyserverAuthActionTypes.success) { const { messagesResult } = action.payload; return mergeNewMessages( messageStore, messagesResult.messageInfos, messagesResult.truncationStatus, newThreadInfos, ); } else if (action.type === incrementalStateSyncActionType) { const { messagesResult, updatesResult } = action.payload; if ( messagesResult.rawMessageInfos.length === 0 && updatesResult.newUpdates.length === 0 ) { return { messageStoreOperations: [], messageStore }; } const newMessagesResult = mergeUpdatesWithMessageInfos( messagesResult.rawMessageInfos, updatesResult.newUpdates, messagesResult.truncationStatuses, ); return mergeNewMessages( messageStore, newMessagesResult.rawMessageInfos, newMessagesResult.truncationStatuses, newThreadInfos, ); } else if ( action.type === fetchPendingUpdatesActionTypes.success && action.payload.type === stateSyncPayloadTypes.INCREMENTAL ) { const { messagesResult, updatesResult } = action.payload; if ( messagesResult.rawMessageInfos.length === 0 && updatesResult.newUpdates.length === 0 ) { return { messageStoreOperations: [], messageStore }; } const newMessagesResult = mergeUpdatesWithMessageInfos( messagesResult.rawMessageInfos, updatesResult.newUpdates, messagesResult.truncationStatuses, ); return mergeNewMessages( messageStore, newMessagesResult.rawMessageInfos, newMessagesResult.truncationStatuses, newThreadInfos, ); } else if (action.type === processUpdatesActionType) { if (action.payload.updatesResult.newUpdates.length === 0) { return { messageStoreOperations: [], messageStore }; } const messagesResult = mergeUpdatesWithMessageInfos( [], action.payload.updatesResult.newUpdates, ); const { messageStoreOperations, messageStore: newMessageStore } = mergeNewMessages( messageStore, messagesResult.rawMessageInfos, messagesResult.truncationStatuses, newThreadInfos, ); return { messageStoreOperations, messageStore: { messages: newMessageStore.messages, threads: newMessageStore.threads, local: newMessageStore.local, currentAsOf: messageStore.currentAsOf, }, }; } else if ( action.type === fullStateSyncActionType || action.type === processMessagesActionType || (action.type === fetchPendingUpdatesActionTypes.success && action.payload.type === stateSyncPayloadTypes.FULL) ) { const { messagesResult } = action.payload; return mergeNewMessages( messageStore, messagesResult.rawMessageInfos, messagesResult.truncationStatuses, newThreadInfos, ); } else if ( action.type === fetchSingleMostRecentMessagesFromThreadsActionTypes.success ) { return mergeNewMessages( messageStore, action.payload.rawMessageInfos, action.payload.truncationStatuses, newThreadInfos, ); } else if ( action.type === fetchMessagesBeforeCursorActionTypes.success || action.type === fetchMostRecentMessagesActionTypes.success ) { return mergeNewMessages( messageStore, action.payload.rawMessageInfos, { [action.payload.threadID]: action.payload.truncationStatus }, newThreadInfos, ); } else if (action.type === deleteKeyserverAccountActionTypes.success) { const { messageStoreOperations, messageStore: filteredMessageStore } = updateMessageStoreWithLatestThreadInfos(messageStore, newThreadInfos); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); let currentAsOf = {}; if (action.payload.keyserverIDs) { currentAsOf = _omit(action.payload.keyserverIDs)( filteredMessageStore.currentAsOf, ); } return { messageStoreOperations, messageStore: { ...filteredMessageStore, currentAsOf, messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, }, }; } else if ( action.type === deleteThreadActionTypes.success || action.type === leaveThreadActionTypes.success || action.type === setNewSessionActionType ) { const { messageStoreOperations, messageStore: filteredMessageStore } = updateMessageStoreWithLatestThreadInfos(messageStore, newThreadInfos); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { ...filteredMessageStore, messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, }, }; } else if (action.type === newThreadActionTypes.success) { const messagesResult = mergeUpdatesWithMessageInfos( action.payload.newMessageInfos, action.payload.updatesResult.newUpdates, ); return mergeNewMessages( messageStore, messagesResult.rawMessageInfos, messagesResult.truncationStatuses, newThreadInfos, ); } else if (action.type === sendMessageReportActionTypes.success) { const { messageInfo } = action.payload; if (messageInfo) { return mergeNewMessages( messageStore, [messageInfo], { [(messageInfo.threadID: string)]: messageTruncationStatus.UNCHANGED, }, newThreadInfos, ); } } else if (action.type === legacyKeyserverRegisterActionTypes.success) { const truncationStatuses: { [string]: MessageTruncationStatus } = {}; for (const messageInfo of action.payload.rawMessageInfos) { truncationStatuses[messageInfo.threadID] = messageTruncationStatus.EXHAUSTIVE; } return mergeNewMessages( messageStore, action.payload.rawMessageInfos, truncationStatuses, newThreadInfos, ); } else if ( action.type === changeThreadSettingsActionTypes.success || action.type === removeUsersFromThreadActionTypes.success || action.type === changeThreadMemberRolesActionTypes.success || action.type === createEntryActionTypes.success || action.type === saveEntryActionTypes.success || action.type === restoreEntryActionTypes.success || action.type === toggleMessagePinActionTypes.success ) { return mergeNewMessages( messageStore, action.payload.newMessageInfos, { [(action.payload.threadID: string)]: messageTruncationStatus.UNCHANGED, }, newThreadInfos, ); } else if (action.type === deleteEntryActionTypes.success) { const payload = action.payload; if (payload) { return mergeNewMessages( messageStore, payload.newMessageInfos, { [payload.threadID]: messageTruncationStatus.UNCHANGED }, newThreadInfos, ); } } else if (action.type === joinThreadActionTypes.success) { const messagesResult = mergeUpdatesWithMessageInfos( action.payload.rawMessageInfos, action.payload.updatesResult.newUpdates, ); return mergeNewMessages( messageStore, messagesResult.rawMessageInfos, messagesResult.truncationStatuses, newThreadInfos, ); } else if (action.type === sendEditMessageActionTypes.success) { const { newMessageInfos } = action.payload; const truncationStatuses: { [string]: MessageTruncationStatus } = {}; for (const messageInfo of newMessageInfos) { truncationStatuses[messageInfo.threadID] = messageTruncationStatus.UNCHANGED; } return mergeNewMessages( messageStore, newMessageInfos, truncationStatuses, newThreadInfos, ); } else if ( action.type === sendTextMessageActionTypes.started || action.type === sendMultimediaMessageActionTypes.started || action.type === sendReactionMessageActionTypes.started ) { const payload = ensureRealizedThreadIDIsUsedWhenPossible( action.payload, newThreadInfos, ); const { localID, threadID } = payload; invariant(localID, `localID should be set on ${action.type}`); const messageIDs = messageStore.threads[threadID]?.messageIDs ?? []; if (!messageStore.messages[localID]) { for (const existingMessageID of messageIDs) { const existingMessageInfo = messageStore.messages[existingMessageID]; if (existingMessageInfo && existingMessageInfo.localID === localID) { return { messageStoreOperations: [], messageStore }; } } } const messageStoreOperations: MessageStoreOperation[] = [ { type: 'replace', payload: { id: localID, messageInfo: payload }, }, ]; let updatedThreads; if (messageStore.messages[localID]) { const messages = { ...messageStore.messages, [(localID: string)]: payload, }; const thread = messageStore.threads[threadID]; updatedThreads = { [(threadID: string)]: { messageIDs: sortMessageIDs(messages)(messageIDs), startReached: thread?.startReached ?? true, }, }; messageStoreOperations.push({ type: 'remove_local_message_infos', payload: { ids: [localID], }, }); } else { updatedThreads = { [(threadID: string)]: messageStore.threads[threadID] ? { ...messageStore.threads[threadID], messageIDs: [localID, ...messageIDs], } : { messageIDs: [localID], startReached: true, }, }; } messageStoreOperations.push({ type: 'replace_threads', payload: { threads: { ...updatedThreads }, }, }); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); const newMessageStore = { messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, currentAsOf: messageStore.currentAsOf, }; return { messageStoreOperations, messageStore: newMessageStore, }; } else if ( action.type === sendTextMessageActionTypes.failed || action.type === sendMultimediaMessageActionTypes.failed ) { const { localID, failedOutboundP2PMessageIDs } = action.payload; let newLocalMessageInfo = { sendFailed: true, }; // For thick threads, we need to keep track of all not sent P2P messages if (failedOutboundP2PMessageIDs && failedOutboundP2PMessageIDs.length > 0) { newLocalMessageInfo = { ...newLocalMessageInfo, outboundP2PMessageIDs: failedOutboundP2PMessageIDs, }; } const messageStoreOperations = [ { type: 'replace_local_message_info', payload: { id: localID, localMessageInfo: newLocalMessageInfo, }, }, ]; const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, currentAsOf: messageStore.currentAsOf, }, }; } else if (action.type === sendReactionMessageActionTypes.failed) { const { localID, threadID } = action.payload; const messageStoreOperations: MessageStoreOperation[] = []; messageStoreOperations.push({ type: 'remove', payload: { ids: [localID] }, }); const newMessageIDs = messageStore.threads[threadID].messageIDs.filter( id => id !== localID, ); const updatedThreads = { [threadID]: { ...messageStore.threads[threadID], messageIDs: newMessageIDs, }, }; messageStoreOperations.push({ type: 'replace_threads', payload: { threads: updatedThreads }, }); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: processedMessageStore, }; } else if ( action.type === sendTextMessageActionTypes.success || action.type === sendMultimediaMessageActionTypes.success || action.type === sendReactionMessageActionTypes.success ) { const { payload } = action; invariant( !threadIsPending(payload.threadID), 'Successful message action should have realized thread id', ); const replaceMessageKey = (messageKey: string) => messageKey === payload.localID ? payload.serverID : messageKey; let newMessages; const messageStoreOperations: MessageStoreOperation[] = []; if (messageStore.messages[payload.serverID]) { // If somehow the serverID got in there already, we'll just update the // serverID message and scrub the localID one newMessages = _omitBy( (messageInfo: RawMessageInfo) => messageInfo.type === messageTypes.TEXT && !messageInfo.id && messageInfo.localID === payload.localID, )(messageStore.messages); messageStoreOperations.push({ type: 'remove', payload: { ids: [payload.localID] }, }); } else if (messageStore.messages[payload.localID]) { // The normal case, the localID message gets replaced by the serverID one newMessages = _mapKeys(replaceMessageKey)(messageStore.messages); messageStoreOperations.push({ type: 'rekey', payload: { from: payload.localID, to: payload.serverID }, }); } else { // Well this is weird, we probably got deauthorized between when the // action was dispatched and when we ran this reducer... return { messageStoreOperations, messageStore }; } const newMessage = { ...newMessages[payload.serverID], id: payload.serverID, localID: payload.localID, time: payload.time, }; newMessages[payload.serverID] = newMessage; messageStoreOperations.push({ type: 'replace', payload: { id: payload.serverID, messageInfo: newMessage }, }); const threadID = payload.threadID; const newMessageIDs = _flow( _uniq, sortMessageIDs(newMessages), )(messageStore.threads[threadID].messageIDs.map(replaceMessageKey)); messageStoreOperations.push({ type: 'remove_local_message_infos', payload: { ids: [payload.localID], }, }); const updatedThreads = { [threadID]: { ...messageStore.threads[threadID], messageIDs: newMessageIDs, }, }; messageStoreOperations.push({ type: 'replace_threads', payload: { threads: updatedThreads }, }); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { ...messageStore, messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, }, }; } else if (action.type === saveMessagesActionType) { const truncationStatuses: { [string]: MessageTruncationStatus } = {}; for (const messageInfo of action.payload.rawMessageInfos) { truncationStatuses[messageInfo.threadID] = messageTruncationStatus.UNCHANGED; } const { messageStoreOperations, messageStore: newMessageStore } = mergeNewMessages( messageStore, action.payload.rawMessageInfos, truncationStatuses, newThreadInfos, ); return { messageStoreOperations, messageStore: { messages: newMessageStore.messages, threads: newMessageStore.threads, local: newMessageStore.local, // We avoid bumping currentAsOf because notifs may include a contracted // RawMessageInfo, so we want to make sure we still fetch it currentAsOf: messageStore.currentAsOf, }, }; } else if (action.type === messageStorePruneActionType) { const reduxMessageIDsToPrune: Array = []; const dbMessageIDsToPrune: Array = []; const updatedThreads: { [string]: ThreadMessageInfo } = {}; for (const threadID of action.payload.threadIDs) { let thread = messageStore.threads[threadID]; if (!thread) { continue; } const newMessageIDs = [...thread.messageIDs]; const removed = newMessageIDs.splice(defaultNumberPerThread); if (removed.length > 0) { thread = { ...thread, messageIDs: newMessageIDs, startReached: false, }; } reduxMessageIDsToPrune.push(...removed); if (!newThreadInfos[threadID]?.thick) { dbMessageIDsToPrune.push(...removed); } updatedThreads[threadID] = thread; } const createMessageStoreOperations = ( messageIDsToPrune: $ReadOnlyArray, ) => { const localMessageIDsToRemove = _pick(messageIDsToPrune)( messageStore.local, ); return [ { type: 'remove', payload: { ids: messageIDsToPrune }, }, { type: 'replace_threads', payload: { threads: updatedThreads, }, }, { type: 'remove_local_message_infos', payload: { ids: Object.keys(localMessageIDsToRemove) }, }, ]; }; const reduxOperations = createMessageStoreOperations( reduxMessageIDsToPrune, ); const processedMessageStore = processMessageStoreOperations( messageStore, reduxOperations, ); const newMessageStore = { messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, currentAsOf: messageStore.currentAsOf, }; const dbOperations = createMessageStoreOperations(dbMessageIDsToPrune); return { messageStoreOperations: dbOperations, messageStore: newMessageStore, }; } else if (action.type === updateMultimediaMessageMediaActionType) { const { messageID: id, currentMediaID, mediaUpdate } = action.payload; const message = messageStore.messages[id]; invariant(message, `message with ID ${id} could not be found`); invariant( message.type === messageTypes.IMAGES || message.type === messageTypes.MULTIMEDIA, `message with ID ${id} is not multimedia`, ); let updatedMessage; let replaced = false; if (message.type === messageTypes.IMAGES) { const media: Image[] = []; for (const singleMedia of message.media) { if (singleMedia.id !== currentMediaID) { media.push(singleMedia); } else { let updatedMedia: Image = { id: mediaUpdate.id ?? singleMedia.id, type: 'photo', uri: mediaUpdate.uri ?? singleMedia.uri, dimensions: mediaUpdate.dimensions ?? singleMedia.dimensions, thumbHash: mediaUpdate.thumbHash ?? singleMedia.thumbHash, }; if ( 'localMediaSelection' in singleMedia && !('localMediaSelection' in mediaUpdate) ) { updatedMedia = { ...updatedMedia, localMediaSelection: singleMedia.localMediaSelection, }; } else if (mediaUpdate.localMediaSelection) { updatedMedia = { ...updatedMedia, localMediaSelection: mediaUpdate.localMediaSelection, }; } media.push(updatedMedia); replaced = true; } } updatedMessage = { ...message, media }; } else { const media: Media[] = []; for (const singleMedia of message.media) { if (singleMedia.id !== currentMediaID) { media.push(singleMedia); } else if ( singleMedia.type === 'photo' && mediaUpdate.type === 'photo' ) { media.push({ ...singleMedia, ...mediaUpdate }); replaced = true; } else if ( singleMedia.type === 'video' && mediaUpdate.type === 'video' ) { media.push({ ...singleMedia, ...mediaUpdate }); replaced = true; } else if ( singleMedia.type === 'encrypted_photo' && mediaUpdate.type === 'encrypted_photo' ) { if (singleMedia.blobURI) { const { holder, ...rest } = mediaUpdate; if (holder) { console.log( `mediaUpdate contained holder for media ${singleMedia.id} ` + 'that has blobURI', ); } media.push({ ...singleMedia, ...rest }); } else { invariant( singleMedia.holder, 'Encrypted media must have holder or blobURI', ); const { blobURI, ...rest } = mediaUpdate; if (blobURI) { console.log( `mediaUpdate contained blobURI for media ${singleMedia.id} ` + 'that has holder', ); } media.push({ ...singleMedia, ...rest }); } replaced = true; } else if ( singleMedia.type === 'encrypted_video' && mediaUpdate.type === 'encrypted_video' ) { if (singleMedia.blobURI) { const { holder, thumbnailHolder, ...rest } = mediaUpdate; if (holder || thumbnailHolder) { console.log( 'mediaUpdate contained holder or thumbnailHolder for media ' + `${singleMedia.id} that has blobURI`, ); } media.push({ ...singleMedia, ...rest }); } else { invariant( singleMedia.holder, 'Encrypted media must have holder or blobURI', ); const { blobURI, thumbnailBlobURI, ...rest } = mediaUpdate; if (blobURI || thumbnailBlobURI) { console.log( 'mediaUpdate contained blobURI or thumbnailBlobURI for media ' + `${singleMedia.id} that has holder`, ); } media.push({ ...singleMedia, ...rest }); } replaced = true; } else if ( singleMedia.type === 'photo' && mediaUpdate.type === 'encrypted_photo' ) { // extract fields that are absent in encrypted_photo type const { uri, localMediaSelection, ...original } = singleMedia; const { holder: newHolder, blobURI: newBlobURI, encryptionKey, ...update } = mediaUpdate; const blobURI = newBlobURI ?? newHolder; invariant( blobURI && encryptionKey, 'holder and encryptionKey are required for encrypted_photo message', ); media.push({ ...original, ...update, type: 'encrypted_photo', blobURI, encryptionKey, }); replaced = true; } else if ( singleMedia.type === 'video' && mediaUpdate.type === 'encrypted_video' ) { const { uri, thumbnailURI, localMediaSelection, ...original } = singleMedia; const { holder: newHolder, blobURI: newBlobURI, encryptionKey, thumbnailHolder: newThumbnailHolder, thumbnailBlobURI: newThumbnailBlobURI, thumbnailEncryptionKey, ...update } = mediaUpdate; const blobURI = newBlobURI ?? newHolder; invariant( blobURI && encryptionKey, 'holder and encryptionKey are required for encrypted_video message', ); const thumbnailBlobURI = newThumbnailBlobURI ?? newThumbnailHolder; invariant( thumbnailBlobURI && thumbnailEncryptionKey, 'thumbnailHolder and thumbnailEncryptionKey are required for ' + 'encrypted_video message', ); media.push({ ...original, ...update, type: 'encrypted_video', blobURI, encryptionKey, thumbnailBlobURI, thumbnailEncryptionKey, }); replaced = true; } else if (mediaUpdate.id) { const { id: newID } = mediaUpdate; media.push({ ...singleMedia, id: newID }); replaced = true; } } updatedMessage = { ...message, media }; } invariant( replaced, `message ${id} did not contain media with ID ${currentMediaID}`, ); const messageStoreOperations = [ { type: 'replace', payload: { id, messageInfo: updatedMessage, }, }, ]; const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { ...messageStore, messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, }, }; } else if (action.type === createLocalMessageActionType) { const messageInfo = ensureRealizedThreadIDIsUsedWhenPossible( action.payload, newThreadInfos, ); const { localID, threadID } = messageInfo; const messageIDs = messageStore.threads[messageInfo.threadID]?.messageIDs ?? []; const threadState: ThreadMessageInfo = messageStore.threads[threadID] ? { ...messageStore.threads[threadID], messageIDs: [localID, ...messageIDs], } : { messageIDs: [localID], startReached: true, }; const messageStoreOperations = [ { type: 'replace', payload: { id: localID, messageInfo }, }, { type: 'replace_threads', payload: { threads: { [(threadID: string)]: threadState }, }, }, ]; const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { ...messageStore, threads: processedMessageStore.threads, messages: processedMessageStore.messages, local: processedMessageStore.local, }, }; } else if (action.type === processServerRequestsActionType) { const { messageStoreOperations, messageStore: messageStoreAfterReassignment, } = reassignMessagesToRealizedThreads(messageStore, newThreadInfos); const processedMessageStore = processMessageStoreOperations( messageStore, messageStoreOperations, ); return { messageStoreOperations, messageStore: { ...messageStoreAfterReassignment, messages: processedMessageStore.messages, threads: processedMessageStore.threads, local: processedMessageStore.local, }, }; } else if (action.type === setClientDBStoreActionType) { const actionPayloadMessageStoreThreads = translateClientDBThreadMessageInfos( action.payload.messageStoreThreads ?? [], ); const actionPayloadMessageStoreLocalMessageInfos = action.payload.messageStoreLocalMessageInfos ?? {}; const messageStoreOperations: Array = []; for (const localMessageID in actionPayloadMessageStoreLocalMessageInfos) { if ( actionPayloadMessageStoreLocalMessageInfos[localMessageID] .outboundP2PMessages && actionPayloadMessageStoreLocalMessageInfos[localMessageID] .outboundP2PMessages.length > 0 ) { // If there are `outboundP2PMessages` it means the message failed, // but setting `sendFailed` could not be done, e.g. when the app was // killed in the process of sending messages. messageStoreOperations.push({ type: 'replace_local_message_info', payload: { id: localMessageID, localMessageInfo: { ...actionPayloadMessageStoreLocalMessageInfos[localMessageID], sendFailed: true, }, }, }); } } const newThreads: { [threadID: string]: ThreadMessageInfo, } = { ...messageStore.threads }; for (const threadID in actionPayloadMessageStoreThreads) { newThreads[threadID] = { ...actionPayloadMessageStoreThreads[threadID], messageIDs: messageStore.threads?.[threadID]?.messageIDs ?? [], }; } const payloadMessages = action.payload.messages; if (!payloadMessages) { return { messageStoreOperations: [], messageStore: { ...messageStore, threads: newThreads, local: actionPayloadMessageStoreLocalMessageInfos, }, }; } const { messageStoreOperations: updatedMessageStoreOperations, messageStore: updatedMessageStore, } = updateMessageStoreWithLatestThreadInfos( { ...messageStore, threads: newThreads, local: actionPayloadMessageStoreLocalMessageInfos, }, newThreadInfos, ); messageStoreOperations.push(...updatedMessageStoreOperations); let threads = { ...updatedMessageStore.threads }; let local = { ...updatedMessageStore.local }; // Store message IDs already contained within threads so that we // do not insert duplicates const existingMessageIDs = new Set(); for (const threadID in threads) { threads[threadID].messageIDs.forEach(msgID => { existingMessageIDs.add(msgID); }); } const threadsNeedMsgIDsResorting = new Set(); const actionPayloadMessages = messageStoreOpsHandlers.translateClientDBData(payloadMessages); // When starting the app on native, we filter out any local-only multimedia // messages because the relevant context is no longer available const messageIDsToBeRemoved = []; const threadsToAdd: { [string]: ThreadMessageInfo } = {}; for (const id in actionPayloadMessages) { const message = actionPayloadMessages[id]; const { threadID } = message; let existingThread = threads[threadID]; if (!existingThread) { existingThread = newThread(); threadsToAdd[threadID] = existingThread; } if ( (message.type === messageTypes.IMAGES || message.type === messageTypes.MULTIMEDIA) && !message.id ) { messageIDsToBeRemoved.push(id); threads = { ...threads, [(threadID: string)]: { ...existingThread, messageIDs: existingThread.messageIDs.filter( curMessageID => curMessageID !== id, ), }, }; local = _pickBy( (localInfo: LocalMessageInfo, key: string) => key !== id, )(local); } else if (!existingMessageIDs.has(id)) { threads = { ...threads, [(threadID: string)]: { ...existingThread, messageIDs: [...existingThread.messageIDs, id], }, }; threadsNeedMsgIDsResorting.add(threadID); } else if (!threads[threadID]) { threads = { ...threads, [(threadID: string)]: existingThread }; } } for (const threadID of threadsNeedMsgIDsResorting) { threads[threadID].messageIDs = sortMessageIDs(actionPayloadMessages)( threads[threadID].messageIDs, ); } const newMessageStore = { ...updatedMessageStore, messages: actionPayloadMessages, threads: threads, local: local, }; if (messageIDsToBeRemoved.length > 0) { messageStoreOperations.push({ type: 'remove', payload: { ids: messageIDsToBeRemoved }, }); } const processedMessageStore = processMessageStoreOperations( newMessageStore, messageStoreOperations, ); messageStoreOperations.push({ type: 'replace_threads', payload: { threads: threadsToAdd }, }); return { messageStoreOperations, messageStore: processedMessageStore, }; } else if (action.type === processDMOpsActionType) { const { rawMessageInfos, updateInfos, outboundP2PMessages, composableMessageID, } = action.payload; const sendingComposableDMMessageToPeers = composableMessageID && outboundP2PMessages && outboundP2PMessages.length > 0; + // For composable DM messages, we rely on dedicated actions + // (`sendTextMessageActionTypes` or `sendMultimediaMessageActionTypes`) + // to update the store, as we want to keep existing sending logic, + // tracking status, etc. + // We use `processDMOpsActionType` to schedule messages to other peers, + // additionally, we need `rawMessageInfos` in the payload to calculate + // the reply count and set the unread status in the thread store, but here + // we want to ignore it, as this could result in the message being added + // to the store twice. + const updatedRawMessageInfos = sendingComposableDMMessageToPeers + ? [] + : rawMessageInfos; + if ( - rawMessageInfos.length === 0 && + updatedRawMessageInfos.length === 0 && updateInfos.length === 0 && !sendingComposableDMMessageToPeers ) { return { messageStoreOperations: [], messageStore }; } const messagesResult = mergeUpdatesWithMessageInfos( - rawMessageInfos, + updatedRawMessageInfos, updateInfos, {}, ); const { messageStoreOperations, messageStore: newMessageStore } = mergeNewMessages( messageStore, messagesResult.rawMessageInfos, messagesResult.truncationStatuses, newThreadInfos, ); if ( !sendingComposableDMMessageToPeers || !composableMessageID || !outboundP2PMessages || // This is a case when a message is retried, // no additional changes are required. !!messageStore.local[composableMessageID] ) { return { messageStoreOperations, messageStore: newMessageStore }; } // Messages to other peers that can be retried from UI, // we need to track statuses of each one. const outboundP2PMessageIDs = outboundP2PMessages.map(msg => msg.messageID); const localOperation: ReplaceMessageStoreLocalMessageInfoOperation = { type: 'replace_local_message_info', payload: { id: composableMessageID, localMessageInfo: { // Setting it to `false` because sending a message is in progress. // It will be set to `true` after we fail to queue at least one // message on Tunnelbroker, or this entry will be removed on success. sendFailed: false, outboundP2PMessageIDs, }, }, }; return { messageStoreOperations: [...messageStoreOperations, localOperation], messageStore: processMessageStoreOperations(newMessageStore, [ localOperation, ]), }; } return { messageStoreOperations: [], messageStore }; } type MergedUpdatesWithMessages = { +rawMessageInfos: $ReadOnlyArray, +truncationStatuses: MessageTruncationStatuses, }; function mergeUpdatesWithMessageInfos( messageInfos: $ReadOnlyArray, newUpdates: $ReadOnlyArray, truncationStatuses?: MessageTruncationStatuses, ): MergedUpdatesWithMessages { const messageIDs = new Set( messageInfos.map(messageInfo => messageInfo.id).filter(Boolean), ); const mergedMessageInfos = [...messageInfos]; const mergedTruncationStatuses = { ...truncationStatuses }; for (const update of newUpdates) { const { mergeMessageInfosAndTruncationStatuses } = updateSpecs[update.type]; if (!mergeMessageInfosAndTruncationStatuses) { continue; } mergeMessageInfosAndTruncationStatuses( messageIDs, mergedMessageInfos, mergedTruncationStatuses, update, ); } return { rawMessageInfos: mergedMessageInfos, truncationStatuses: mergedTruncationStatuses, }; } export { freshMessageStore, reduceMessageStore, mergeUpdatesWithMessageInfos }; diff --git a/lib/reducers/thread-reducer.js b/lib/reducers/thread-reducer.js index 47fb3d4ae..f215a458e 100644 --- a/lib/reducers/thread-reducer.js +++ b/lib/reducers/thread-reducer.js @@ -1,589 +1,615 @@ // @flow import { setThreadUnreadStatusActionTypes, updateActivityActionTypes, } from '../actions/activity-actions.js'; import { setClientDBStoreActionType } from '../actions/client-db-store-actions.js'; import { saveMessagesActionType } from '../actions/message-actions.js'; import { legacySiweAuthActionTypes } from '../actions/siwe-actions.js'; import { changeThreadSettingsActionTypes, deleteThreadActionTypes, newThreadActionTypes, removeUsersFromThreadActionTypes, changeThreadMemberRolesActionTypes, joinThreadActionTypes, leaveThreadActionTypes, modifyCommunityRoleActionTypes, deleteCommunityRoleActionTypes, } from '../actions/thread-actions.js'; import { fetchPendingUpdatesActionTypes } from '../actions/update-actions.js'; import { keyserverAuthActionTypes, deleteKeyserverAccountActionTypes, legacyLogInActionTypes, legacyKeyserverRegisterActionTypes, updateSubscriptionActionTypes, } from '../actions/user-actions.js'; import { getThreadIDsForKeyservers, extractKeyserverIDFromID, } from '../keyserver-conn/keyserver-call-utils.js'; import { setNewSessionActionType } from '../keyserver-conn/keyserver-conn-types.js'; import { type ThreadStoreOperation, threadStoreOpsHandlers, } from '../ops/thread-store-ops.js'; +import { getThreadUpdatesForNewMessages } from '../shared/dm-ops/dm-op-utils.js'; import { stateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { updateSpecs } from '../shared/updates/update-specs.js'; import { processDMOpsActionType } from '../types/dm-ops.js'; import type { RawThreadInfo } from '../types/minimally-encoded-thread-permissions-types.js'; import type { BaseAction } from '../types/redux-types.js'; import { type ClientThreadInconsistencyReportCreationRequest } from '../types/report-types.js'; import { serverRequestTypes, processServerRequestsActionType, } from '../types/request-types.js'; import { fullStateSyncActionType, incrementalStateSyncActionType, stateSyncPayloadTypes, } from '../types/socket-types.js'; import type { RawThreadInfos, ThreadStore } from '../types/thread-types.js'; import { type ClientUpdateInfo, processUpdatesActionType, } from '../types/update-types.js'; const { processStoreOperations: processThreadStoreOperations } = threadStoreOpsHandlers; function generateOpsAndProcessThreadUpdates( threadStore: ThreadStore, newUpdates: $ReadOnlyArray, ): { +threadStoreOperations: $ReadOnlyArray, +updatedThreadStore: ThreadStore, } { const operations: Array = []; let store = threadStore; for (const update of newUpdates) { const ops = updateSpecs[update.type].generateOpsForThreadUpdates?.( store.threadInfos, update, ); if (!ops || ops.length === 0) { continue; } operations.push(...ops); store = processThreadStoreOperations(store, ops); } return { threadStoreOperations: operations, updatedThreadStore: store }; } type ReduceThreadInfosResult = { threadStore: ThreadStore, newThreadInconsistencies: $ReadOnlyArray, threadStoreOperations: $ReadOnlyArray, }; function handleFullStateSync( state: ThreadStore, keyserverID: string, newThreadInfos: RawThreadInfos, ): ReduceThreadInfosResult { const threadsToRemove = Object.keys(state.threadInfos).filter( key => extractKeyserverIDFromID(key) === keyserverID, ); const threadStoreOperations = [ { type: 'remove', payload: { ids: threadsToRemove }, }, ...Object.keys(newThreadInfos).map((id: string) => ({ type: 'replace', payload: { id, threadInfo: newThreadInfos[id] }, })), ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } function reduceThreadInfos( state: ThreadStore, action: BaseAction, + viewerID: ?string, ): ReduceThreadInfosResult { if (action.type === fullStateSyncActionType) { return handleFullStateSync( state, action.payload.keyserverID, action.payload.threadInfos, ); } else if ( action.type === fetchPendingUpdatesActionTypes.success && action.payload.type === stateSyncPayloadTypes.FULL ) { return handleFullStateSync( state, action.payload.keyserverID, action.payload.threadInfos, ); } else if ( action.type === legacyLogInActionTypes.success || action.type === legacySiweAuthActionTypes.success || action.type === legacyKeyserverRegisterActionTypes.success ) { const newThreadInfos = action.payload.threadInfos; const threadStoreOperations = [ { type: 'remove_all', }, ...Object.keys(newThreadInfos).map((id: string) => ({ type: 'replace', payload: { id, threadInfo: newThreadInfos[id] }, })), ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === keyserverAuthActionTypes.success) { const keyserverIDs = Object.keys(action.payload.updatesCurrentAsOf); const threadIDsToRemove = getThreadIDsForKeyservers( Object.keys(state.threadInfos), keyserverIDs, ); const newThreadInfos = action.payload.threadInfos; const threadStoreOperations = [ { type: 'remove', payload: { ids: threadIDsToRemove }, }, ...Object.keys(newThreadInfos).map((id: string) => ({ type: 'replace', payload: { id, threadInfo: newThreadInfos[id] }, })), ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === deleteKeyserverAccountActionTypes.success) { const threadIDsToRemove = getThreadIDsForKeyservers( Object.keys(state.threadInfos), action.payload.keyserverIDs, ); if (threadIDsToRemove.length === 0) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const threadStoreOperations = [ { type: 'remove', payload: { ids: threadIDsToRemove }, }, ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if ( action.type === setNewSessionActionType && action.payload.sessionChange.cookieInvalidated ) { const threadIDsToRemove = getThreadIDsForKeyservers( Object.keys(state.threadInfos), [action.payload.keyserverID], ); if (threadIDsToRemove.length === 0) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const threadStoreOperations = [ { type: 'remove', payload: { ids: threadIDsToRemove }, }, ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if ( action.type === joinThreadActionTypes.success || action.type === leaveThreadActionTypes.success || action.type === deleteThreadActionTypes.success || action.type === changeThreadSettingsActionTypes.success || action.type === removeUsersFromThreadActionTypes.success || action.type === changeThreadMemberRolesActionTypes.success || action.type === incrementalStateSyncActionType || action.type === processUpdatesActionType || action.type === newThreadActionTypes.success || action.type === modifyCommunityRoleActionTypes.success || action.type === deleteCommunityRoleActionTypes.success ) { const { newUpdates } = action.payload.updatesResult; if (newUpdates.length === 0) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const { threadStoreOperations, updatedThreadStore } = generateOpsAndProcessThreadUpdates(state, newUpdates); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if ( action.type === fetchPendingUpdatesActionTypes.success && action.payload.type === stateSyncPayloadTypes.INCREMENTAL ) { const { newUpdates } = action.payload.updatesResult; if (newUpdates.length === 0) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const { threadStoreOperations, updatedThreadStore } = generateOpsAndProcessThreadUpdates(state, newUpdates); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === updateSubscriptionActionTypes.success) { const { threadID, subscription } = action.payload; const threadInfo = state.threadInfos[threadID]; const newThreadInfo = { ...threadInfo, currentUser: { ...threadInfo.currentUser, subscription, }, }; const threadStoreOperations = [ { type: 'replace', payload: { id: threadID, threadInfo: newThreadInfo, }, }, ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === saveMessagesActionType) { const threadIDToMostRecentTime = new Map(); for (const messageInfo of action.payload.rawMessageInfos) { const current = threadIDToMostRecentTime.get(messageInfo.threadID); if (!current || current < messageInfo.time) { threadIDToMostRecentTime.set(messageInfo.threadID, messageInfo.time); } } const changedThreadInfos: { [string]: RawThreadInfo } = {}; for (const [threadID, mostRecentTime] of threadIDToMostRecentTime) { const threadInfo = state.threadInfos[threadID]; if (!threadInfo) { continue; } if (threadInfo.thick) { if (threadInfo.timestamps.currentUser.unread > mostRecentTime) { continue; } changedThreadInfos[threadID] = { ...threadInfo, timestamps: { ...threadInfo.timestamps, currentUser: { ...threadInfo.timestamps.currentUser, unread: mostRecentTime, }, }, }; if (!threadInfo.currentUser.unread) { changedThreadInfos[threadID] = { ...changedThreadInfos[threadID], currentUser: { ...threadInfo.currentUser, unread: true, }, }; } continue; } if ( !action.payload.updatesCurrentAsOf || threadInfo.currentUser.unread || action.payload.updatesCurrentAsOf > mostRecentTime ) { continue; } changedThreadInfos[threadID] = { ...threadInfo, currentUser: { ...threadInfo.currentUser, unread: true, }, }; } if (Object.keys(changedThreadInfos).length !== 0) { const threadStoreOperations = Object.keys(changedThreadInfos).map(id => ({ type: 'replace', payload: { id, threadInfo: changedThreadInfos[id], }, })); const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } } else if (action.type === processServerRequestsActionType) { const checkStateRequest = action.payload.serverRequests.find( candidate => candidate.type === serverRequestTypes.CHECK_STATE, ); if (!checkStateRequest || !checkStateRequest.stateChanges) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const { rawThreadInfos, deleteThreadIDs } = checkStateRequest.stateChanges; if (!rawThreadInfos && !deleteThreadIDs) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const threadStoreOperations: ThreadStoreOperation[] = []; if (rawThreadInfos) { for (const rawThreadInfo of rawThreadInfos) { threadStoreOperations.push({ type: 'replace', payload: { id: rawThreadInfo.id, threadInfo: rawThreadInfo, }, }); } } if (deleteThreadIDs) { threadStoreOperations.push({ type: 'remove', payload: { ids: deleteThreadIDs, }, }); } const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); const newThreadInconsistencies = stateSyncSpecs.threads.findStoreInconsistencies( action, state.threadInfos, updatedThreadStore.threadInfos, ); return { threadStore: updatedThreadStore, newThreadInconsistencies, threadStoreOperations, }; } else if (action.type === updateActivityActionTypes.success) { const updatedThreadInfos: { [string]: RawThreadInfo } = {}; for (const setToUnread of action.payload.result.unfocusedToUnread) { const threadInfo = state.threadInfos[setToUnread]; if (threadInfo && !threadInfo.currentUser.unread) { updatedThreadInfos[setToUnread] = { ...threadInfo, currentUser: { ...threadInfo.currentUser, unread: true, }, }; } } if (Object.keys(updatedThreadInfos).length === 0) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const threadStoreOperations = Object.keys(updatedThreadInfos).map(id => ({ type: 'replace', payload: { id, threadInfo: updatedThreadInfos[id], }, })); const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === setThreadUnreadStatusActionTypes.started) { const { threadID, unread } = action.payload; const threadInfo = state.threadInfos[threadID]; const updatedThreadInfo = { ...threadInfo, currentUser: { ...threadInfo.currentUser, unread, }, }; const threadStoreOperations = [ { type: 'replace', payload: { id: threadID, threadInfo: updatedThreadInfo, }, }, ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === setThreadUnreadStatusActionTypes.success) { const { threadID, resetToUnread } = action.payload; const threadInfo = state.threadInfos[threadID]; const { currentUser } = threadInfo; if (!resetToUnread || currentUser.unread) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const updatedThread = { ...threadInfo, currentUser: { ...currentUser, unread: true }, }; const threadStoreOperations = [ { type: 'replace', payload: { id: threadID, threadInfo: updatedThread, }, }, ]; const updatedThreadStore = processThreadStoreOperations( state, threadStoreOperations, ); return { threadStore: updatedThreadStore, newThreadInconsistencies: [], threadStoreOperations, }; } else if (action.type === setClientDBStoreActionType) { return { threadStore: action.payload.threadStore ?? state, newThreadInconsistencies: [], threadStoreOperations: [], }; } else if (action.type === processDMOpsActionType) { - const { updateInfos } = action.payload; - if (updateInfos.length === 0) { + const { updateInfos, rawMessageInfos } = action.payload; + if (updateInfos.length === 0 && rawMessageInfos.length === 0) { return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } const { threadStoreOperations, updatedThreadStore } = generateOpsAndProcessThreadUpdates(state, updateInfos); + + const newMessagesUpdates = getThreadUpdatesForNewMessages( + rawMessageInfos, + updateInfos, + updatedThreadStore.threadInfos, + viewerID, + ); + + if (newMessagesUpdates.length === 0) { + return { + threadStore: updatedThreadStore, + newThreadInconsistencies: [], + threadStoreOperations, + }; + } + + const { + threadStoreOperations: threadStoreOperationsWithNewMessagesUpdates, + updatedThreadStore: threadStoreWithNewMessagesUpdates, + } = generateOpsAndProcessThreadUpdates( + updatedThreadStore, + newMessagesUpdates, + ); + return { - threadStore: updatedThreadStore, + threadStore: threadStoreWithNewMessagesUpdates, newThreadInconsistencies: [], - threadStoreOperations, + threadStoreOperations: threadStoreOperationsWithNewMessagesUpdates, }; } return { threadStore: state, newThreadInconsistencies: [], threadStoreOperations: [], }; } export { reduceThreadInfos }; diff --git a/lib/shared/dm-ops/dm-op-utils.js b/lib/shared/dm-ops/dm-op-utils.js index bc7728963..d6e6d159d 100644 --- a/lib/shared/dm-ops/dm-op-utils.js +++ b/lib/shared/dm-ops/dm-op-utils.js @@ -1,455 +1,440 @@ // @flow import invariant from 'invariant'; import _groupBy from 'lodash/fp/groupBy.js'; import * as React from 'react'; import uuid from 'uuid'; import { type ProcessDMOperationUtilities } from './dm-op-spec.js'; import { dmOpSpecs } from './dm-op-specs.js'; import { useProcessAndSendDMOperation } from './process-dm-ops.js'; import { useFindUserIdentities } from '../../actions/find-user-identities-actions.js'; import { useLoggedInUserInfo } from '../../hooks/account-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useGetAndUpdateDeviceListsForUsers } from '../../hooks/peer-list-hooks.js'; import { mergeUpdatesWithMessageInfos } from '../../reducers/message-reducer.js'; import { getAllPeerUserIDAndDeviceIDs } from '../../selectors/user-selectors.js'; import { type P2PMessageRecipient } from '../../tunnelbroker/peer-to-peer-context.js'; import type { CreateThickRawThreadInfoInput, DMAddMembersOperation, DMAddViewerToThreadMembersOperation, DMOperation, ComposableDMOperation, } from '../../types/dm-ops.js'; import type { RawMessageInfo } from '../../types/message-types.js'; import type { - RawThreadInfo, ThickRawThreadInfo, ThreadInfo, } from '../../types/minimally-encoded-thread-permissions-types.js'; import type { InboundActionMetadata } from '../../types/redux-types.js'; import { outboundP2PMessageStatuses, type OutboundP2PMessage, } from '../../types/sqlite-types.js'; import { assertThickThreadType, thickThreadTypes, } from '../../types/thread-types-enum.js'; -import type { LegacyRawThreadInfo } from '../../types/thread-types.js'; +import type { RawThreadInfos } from '../../types/thread-types.js'; import { type DMOperationP2PMessage, userActionsP2PMessageTypes, } from '../../types/tunnelbroker/user-actions-peer-to-peer-message-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ClientUpdateInfo } from '../../types/update-types.js'; import { getContentSigningKey } from '../../utils/crypto-utils.js'; import { useSelector } from '../../utils/redux-utils.js'; import { messageSpecs } from '../messages/message-specs.js'; import { userHasDeviceList } from '../thread-utils.js'; -import { updateSpecs } from '../updates/update-specs.js'; function generateMessagesToPeers( message: DMOperation, peers: $ReadOnlyArray<{ +userID: string, +deviceID: string, }>, ): $ReadOnlyArray { const opMessage: DMOperationP2PMessage = { type: userActionsP2PMessageTypes.DM_OPERATION, op: message, }; const plaintext = JSON.stringify(opMessage); const outboundP2PMessages = []; for (const peer of peers) { const messageToPeer: OutboundP2PMessage = { messageID: uuid.v4(), deviceID: peer.deviceID, userID: peer.userID, timestamp: new Date().getTime().toString(), plaintext, ciphertext: '', status: outboundP2PMessageStatuses.persisted, supportsAutoRetry: dmOpSpecs[message.type].supportsAutoRetry, }; outboundP2PMessages.push(messageToPeer); } return outboundP2PMessages; } export const dmOperationSpecificationTypes = Object.freeze({ OUTBOUND: 'OutboundDMOperationSpecification', INBOUND: 'InboundDMOperationSpecification', }); type OutboundDMOperationSpecificationRecipients = | { +type: 'all_peer_devices' | 'self_devices' } | { +type: 'some_users', +userIDs: $ReadOnlyArray } | { +type: 'all_thread_members', +threadID: string } | { +type: 'some_devices', +deviceIDs: $ReadOnlyArray }; // The operation generated on the sending client, causes changes to // the state and broadcasting information to peers. export type OutboundDMOperationSpecification = { +type: 'OutboundDMOperationSpecification', +op: DMOperation, +recipients: OutboundDMOperationSpecificationRecipients, +sendOnly?: boolean, }; export type OutboundComposableDMOperationSpecification = { +type: 'OutboundDMOperationSpecification', +op: ComposableDMOperation, +recipients: OutboundDMOperationSpecificationRecipients, // Composable DM Ops are created only to be sent, locally we use // dedicated mechanism for updating the store. +sendOnly: true, +composableMessageID: string, }; // The operation received from other peers, causes changes to // the state and after processing, sends confirmation to the sender. export type InboundDMOperationSpecification = { +type: 'InboundDMOperationSpecification', +op: DMOperation, +metadata: ?InboundActionMetadata, }; export type DMOperationSpecification = | OutboundDMOperationSpecification | InboundDMOperationSpecification; function useCreateMessagesToPeersFromDMOp(): ( operation: DMOperation, recipients: OutboundDMOperationSpecificationRecipients, ) => Promise<$ReadOnlyArray> { const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const utilities = useSendDMOperationUtils(); const auxUserInfos = useSelector(state => state.auxUserStore.auxUserInfos); const getAndUpdateDeviceListsForUsers = useGetAndUpdateDeviceListsForUsers(); const getUsersWithoutDeviceList = React.useCallback( (userIDs: $ReadOnlyArray) => { const missingDeviceListsUserIDs: Array = []; for (const userID of userIDs) { const supportsThickThreads = userHasDeviceList(userID, auxUserInfos); if (!supportsThickThreads) { missingDeviceListsUserIDs.push(userID); } } return missingDeviceListsUserIDs; }, [auxUserInfos], ); const getMissingPeers = React.useCallback( async ( userIDs: $ReadOnlyArray, ): Promise<$ReadOnlyArray> => { const missingDeviceListsUserIDs = getUsersWithoutDeviceList(userIDs); if (missingDeviceListsUserIDs.length === 0) { return []; } const deviceLists = await getAndUpdateDeviceListsForUsers( missingDeviceListsUserIDs, true, ); const updatedPeers: Array = []; for (const userID of missingDeviceListsUserIDs) { if (deviceLists[userID] && deviceLists[userID].devices.length > 0) { updatedPeers.push( ...deviceLists[userID].devices.map(deviceID => ({ deviceID, userID, })), ); } } return updatedPeers; }, [getAndUpdateDeviceListsForUsers, getUsersWithoutDeviceList], ); return React.useCallback( async ( operation: DMOperation, recipients: OutboundDMOperationSpecificationRecipients, ): Promise<$ReadOnlyArray> => { const { viewerID, threadInfos } = utilities; if (!viewerID) { return []; } let peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs; if (recipients.type === 'self_devices') { peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter( peer => peer.userID === viewerID, ); } else if (recipients.type === 'some_users') { const missingPeers = await getMissingPeers(recipients.userIDs); const updatedPeers = [...allPeerUserIDAndDeviceIDs, ...missingPeers]; const userIDs = new Set(recipients.userIDs); peerUserIDAndDeviceIDs = updatedPeers.filter(peer => userIDs.has(peer.userID), ); } else if (recipients.type === 'all_thread_members') { const { threadID } = recipients; if (!threadInfos[threadID]) { console.log( `all_thread_members called for threadID ${threadID}, which is ` + 'missing from the ThreadStore. if sending a message soon after ' + 'thread creation, consider some_users instead', ); } const members = threadInfos[recipients.threadID]?.members ?? []; const memberIDs = members.map(member => member.id); const missingPeers = await getMissingPeers(memberIDs); const updatedPeers = [...allPeerUserIDAndDeviceIDs, ...missingPeers]; const userIDs = new Set(memberIDs); peerUserIDAndDeviceIDs = updatedPeers.filter(peer => userIDs.has(peer.userID), ); } else if (recipients.type === 'some_devices') { const deviceIDs = new Set(recipients.deviceIDs); peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter(peer => deviceIDs.has(peer.deviceID), ); } const thisDeviceID = await getContentSigningKey(); const targetPeers = peerUserIDAndDeviceIDs.filter( peer => peer.deviceID !== thisDeviceID, ); return generateMessagesToPeers(operation, targetPeers); }, [allPeerUserIDAndDeviceIDs, getMissingPeers, utilities], ); } function getCreateThickRawThreadInfoInputFromThreadInfo( threadInfo: ThickRawThreadInfo, ): CreateThickRawThreadInfoInput { const roleID = Object.keys(threadInfo.roles).pop(); const thickThreadType = assertThickThreadType(threadInfo.type); return { threadID: threadInfo.id, threadType: thickThreadType, creationTime: threadInfo.creationTime, parentThreadID: threadInfo.parentThreadID, allMemberIDsWithSubscriptions: threadInfo.members.map( ({ id, subscription }) => ({ id, subscription, }), ), roleID, unread: !!threadInfo.currentUser.unread, name: threadInfo.name, avatar: threadInfo.avatar, description: threadInfo.description, color: threadInfo.color, containingThreadID: threadInfo.containingThreadID, sourceMessageID: threadInfo.sourceMessageID, repliesCount: threadInfo.repliesCount, pinnedCount: threadInfo.pinnedCount, timestamps: threadInfo.timestamps, }; } function useAddDMThreadMembers(): ( newMemberIDs: $ReadOnlyArray, threadInfo: ThreadInfo, ) => Promise { const viewerID = useSelector( state => state.currentUserInfo && state.currentUserInfo.id, ); const processAndSendDMOperation = useProcessAndSendDMOperation(); const threadInfos = useSelector(state => state.threadStore.threadInfos); return React.useCallback( async (newMemberIDs: $ReadOnlyArray, threadInfo: ThreadInfo) => { const rawThreadInfo = threadInfos[threadInfo.id]; invariant(rawThreadInfo.thick, 'thread should be thick'); const existingThreadDetails = getCreateThickRawThreadInfoInputFromThreadInfo(rawThreadInfo); invariant(viewerID, 'viewerID should be set'); const addViewerToThreadMembersOperation: DMAddViewerToThreadMembersOperation = { type: 'add_viewer_to_thread_members', existingThreadDetails, editorID: viewerID, time: Date.now(), messageID: uuid.v4(), addedUserIDs: newMemberIDs, }; const viewerOperationSpecification: OutboundDMOperationSpecification = { type: dmOperationSpecificationTypes.OUTBOUND, op: addViewerToThreadMembersOperation, recipients: { type: 'some_users', userIDs: newMemberIDs, }, sendOnly: true, }; invariant(viewerID, 'viewerID should be set'); const addMembersOperation: DMAddMembersOperation = { type: 'add_members', threadID: threadInfo.id, editorID: viewerID, time: Date.now(), messageID: uuid.v4(), addedUserIDs: newMemberIDs, }; const newMemberIDsSet = new Set(newMemberIDs); const recipientsThreadID = threadInfo.type === thickThreadTypes.THICK_SIDEBAR && threadInfo.parentThreadID ? threadInfo.parentThreadID : threadInfo.id; const existingMembers = threadInfos[recipientsThreadID]?.members ?.map(member => member.id) ?.filter(memberID => !newMemberIDsSet.has(memberID)) ?? []; const addMembersOperationSpecification: OutboundDMOperationSpecification = { type: dmOperationSpecificationTypes.OUTBOUND, op: addMembersOperation, recipients: { type: 'some_users', userIDs: existingMembers, }, }; await Promise.all([ processAndSendDMOperation(viewerOperationSpecification), processAndSendDMOperation(addMembersOperationSpecification), ]); }, [processAndSendDMOperation, threadInfos, viewerID], ); } function getThreadUpdatesForNewMessages( rawMessageInfos: $ReadOnlyArray, updateInfos: $ReadOnlyArray, - utilities: ProcessDMOperationUtilities, + threadInfos: RawThreadInfos, + viewerID: ?string, ): Array { - const { threadInfos, viewerID } = utilities; + if (!viewerID) { + return []; + } const { rawMessageInfos: allNewMessageInfos } = mergeUpdatesWithMessageInfos( rawMessageInfos, updateInfos, ); const messagesByThreadID = _groupBy(message => message.threadID)( allNewMessageInfos, ); - const updatedThreadInfosByThreadID: { - [string]: RawThreadInfo | LegacyRawThreadInfo, - } = {}; - for (const threadID in messagesByThreadID) { - updatedThreadInfosByThreadID[threadID] = threadInfos[threadID]; - } - for (const update of updateInfos) { - const updatedThreadInfo = updateSpecs[update.type].getUpdatedThreadInfo?.( - update, - updatedThreadInfosByThreadID, - ); - if (updatedThreadInfo) { - updatedThreadInfosByThreadID[updatedThreadInfo.id] = updatedThreadInfo; - } - } - const newUpdateInfos: Array = []; for (const threadID in messagesByThreadID) { const repliesCountIncreasingMessages = messagesByThreadID[threadID].filter( message => messageSpecs[message.type].includedInRepliesCount, ); - let threadInfo = updatedThreadInfosByThreadID[threadID]; + let threadInfo = threadInfos[threadID]; if (repliesCountIncreasingMessages.length > 0) { const repliesCountIncreaseTime = Math.max( repliesCountIncreasingMessages.map(message => message.time), ); + const oldRepliesCount = threadInfo?.repliesCount ?? 0; const newThreadInfo = { ...threadInfo, - repliesCount: - threadInfo.repliesCount + repliesCountIncreasingMessages.length, + repliesCount: oldRepliesCount + repliesCountIncreasingMessages.length, }; newUpdateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time: repliesCountIncreaseTime, threadInfo: newThreadInfo, }); threadInfo = newThreadInfo; } const messagesFromOtherPeers = messagesByThreadID[threadID].filter( message => message.creatorID !== viewerID, ); if (messagesFromOtherPeers.length === 0) { continue; } // We take the most recent timestamp to make sure that // change_thread_read_status operation older // than it won't flip the status to read. const time = Math.max(messagesFromOtherPeers.map(message => message.time)); invariant(threadInfo.thick, 'Thread should be thick'); // We aren't checking if the unread timestamp is lower than the time. // We're doing this because we want to flip the thread to unread after // any new message from a non-viewer. newUpdateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: uuid.v4(), time, threadID: threadInfo.id, unread: true, }); } return newUpdateInfos; } function useSendDMOperationUtils(): $ReadOnly<{ ...ProcessDMOperationUtilities, viewerID: ?string, }> { const fetchMessage = useGetLatestMessageEdit(); const threadInfos = useSelector(state => state.threadStore.threadInfos); const entryInfos = useSelector(state => state.entryStore.entryInfos); const findUserIdentities = useFindUserIdentities(); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; return React.useMemo( () => ({ viewerID, fetchMessage, threadInfos, entryInfos, findUserIdentities, }), [viewerID, fetchMessage, threadInfos, entryInfos, findUserIdentities], ); } export { useCreateMessagesToPeersFromDMOp, useAddDMThreadMembers, getCreateThickRawThreadInfoInputFromThreadInfo, getThreadUpdatesForNewMessages, useSendDMOperationUtils, }; diff --git a/lib/shared/dm-ops/process-dm-ops.js b/lib/shared/dm-ops/process-dm-ops.js index 2ff78ed0d..cb0bc9c2a 100644 --- a/lib/shared/dm-ops/process-dm-ops.js +++ b/lib/shared/dm-ops/process-dm-ops.js @@ -1,390 +1,375 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import type { ProcessDMOperationUtilities } from './dm-op-spec.js'; import { dmOpSpecs } from './dm-op-specs.js'; import { type OutboundDMOperationSpecification, type DMOperationSpecification, useCreateMessagesToPeersFromDMOp, dmOperationSpecificationTypes, type OutboundComposableDMOperationSpecification, - getThreadUpdatesForNewMessages, useSendDMOperationUtils, } from './dm-op-utils.js'; import { useProcessBlobHolders } from '../../actions/holder-actions.js'; import { processNewUserIDsActionType } from '../../actions/user-actions.js'; import { useDispatchWithMetadata } from '../../hooks/ops-hooks.js'; import { usePeerToPeerCommunication, type ProcessOutboundP2PMessagesResult, } from '../../tunnelbroker/peer-to-peer-context.js'; import { processDMOpsActionType, queueDMOpsActionType, dmOperationValidator, } from '../../types/dm-ops.js'; import type { NotificationsCreationData } from '../../types/notif-types.js'; import type { DispatchMetadata } from '../../types/redux-types.js'; import type { OutboundP2PMessage } from '../../types/sqlite-types.js'; import { extractUserIDsFromPayload } from '../../utils/conversion-utils.js'; import { useSelector, useDispatch } from '../../utils/redux-utils.js'; function useProcessDMOperation(): ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => Promise { const baseUtilities = useSendDMOperationUtils(); const dispatchWithMetadata = useDispatchWithMetadata(); const processBlobHolders = useProcessBlobHolders(); const createMessagesToPeersFromDMOp = useCreateMessagesToPeersFromDMOp(); const dispatch = useDispatch(); return React.useCallback( async ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => { const { viewerID, ...restUtilities } = baseUtilities; if (!viewerID) { console.log('ignored DMOperation because logged out'); return; } const utilities: ProcessDMOperationUtilities = { ...restUtilities, viewerID, }; const { op: dmOp } = dmOperationSpecification; let outboundP2PMessages: ?$ReadOnlyArray = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND ) { outboundP2PMessages = await createMessagesToPeersFromDMOp( dmOp, dmOperationSpecification.recipients, ); } let dispatchMetadata: ?DispatchMetadata = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOpID ) { dispatchMetadata = { dmOpID, }; } else if ( dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND ) { dispatchMetadata = dmOperationSpecification.metadata; } let composableMessageID: ?string = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && !dmOpSpecs[dmOp.type].supportsAutoRetry ) { composableMessageID = dmOp.messageID; } if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOperationSpecification.sendOnly ) { const notificationsCreationData = await dmOpSpecs[ dmOp.type ].notificationsCreationData?.(dmOp, utilities); dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos: [], updateInfos: [], outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, dispatchMetadata, ); return; } if (!dmOpSpecs[dmOp.type].operationValidator.is(dmOp)) { console.log(`Ignoring ${dmOp.type} operation because it is invalid`); return; } const processingCheckResult = await dmOpSpecs[dmOp.type].canBeProcessed( dmOp, utilities, ); if (!processingCheckResult.isProcessingPossible) { if (processingCheckResult.reason.type === 'invalid') { return; } let condition; if (processingCheckResult.reason.type === 'missing_thread') { condition = { type: 'thread', threadID: processingCheckResult.reason.threadID, }; } else if (processingCheckResult.reason.type === 'missing_entry') { condition = { type: 'entry', entryID: processingCheckResult.reason.entryID, }; } else if (processingCheckResult.reason.type === 'missing_message') { condition = { type: 'message', messageID: processingCheckResult.reason.messageID, }; } else if (processingCheckResult.reason.type === 'missing_membership') { condition = { type: 'membership', threadID: processingCheckResult.reason.threadID, userID: processingCheckResult.reason.userID, }; } dispatchWithMetadata( { type: queueDMOpsActionType, payload: { operation: dmOp, timestamp: Date.now(), condition, }, }, dispatchMetadata, ); return; } const newUserIDs = extractUserIDsFromPayload(dmOperationValidator, dmOp); if (newUserIDs.length > 0) { dispatch({ type: processNewUserIDsActionType, payload: { userIDs: newUserIDs }, }); } const dmOpSpec = dmOpSpecs[dmOp.type]; const notificationsCreationDataPromise: Promise = (async () => { if ( dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND || !dmOpSpec.notificationsCreationData ) { return null; } return await dmOpSpec.notificationsCreationData(dmOp, utilities); })(); const [ { rawMessageInfos, updateInfos, blobOps }, notificationsCreationData, ] = await Promise.all([ dmOpSpec.processDMOperation(dmOp, utilities), notificationsCreationDataPromise, ]); const holderOps = blobOps .map(({ dmOpType, ...holderOp }) => { if ( (dmOpType === 'inbound_only' && dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND) || (dmOpType === 'outbound_only' && dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND) ) { return null; } return holderOp; }) .filter(Boolean); void processBlobHolders(holderOps); - const newUpdateInfos = getThreadUpdatesForNewMessages( - rawMessageInfos, - updateInfos, - utilities, - ); - - updateInfos.push(...newUpdateInfos); - dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos, updateInfos, outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, dispatchMetadata, ); }, [ baseUtilities, processBlobHolders, dispatchWithMetadata, createMessagesToPeersFromDMOp, dispatch, ], ); } function useProcessAndSendDMOperation(): ( dmOperationSpecification: OutboundDMOperationSpecification, ) => Promise { const processDMOps = useProcessDMOperation(); const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); return React.useCallback( async (dmOperationSpecification: OutboundDMOperationSpecification) => { const { promise, dmOpID } = getDMOpsSendingPromise(); await processDMOps(dmOperationSpecification, dmOpID); await promise; }, [getDMOpsSendingPromise, processDMOps], ); } function useSendComposableDMOperation(): ( dmOperationSpecification: OutboundComposableDMOperationSpecification, ) => Promise { const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); const dispatchWithMetadata = useDispatchWithMetadata(); const baseUtilities = useSendDMOperationUtils(); const { processOutboundMessages } = usePeerToPeerCommunication(); const localMessageInfos = useSelector(state => state.messageStore.local); const createMessagesToPeersFromDMOp = useCreateMessagesToPeersFromDMOp(); return React.useCallback( async ( dmOperationSpecification: OutboundComposableDMOperationSpecification, ): Promise => { const { viewerID, ...restUtilities } = baseUtilities; if (!viewerID) { console.log('ignored DMOperation because logged out'); return { result: 'failure', failedMessageIDs: [], }; } const utilities: ProcessDMOperationUtilities = { ...restUtilities, viewerID, }; const { promise, dmOpID } = getDMOpsSendingPromise(); const { op, composableMessageID, recipients } = dmOperationSpecification; const localMessageInfo = localMessageInfos[composableMessageID]; if ( localMessageInfo?.outboundP2PMessageIDs && localMessageInfo.outboundP2PMessageIDs.length > 0 ) { processOutboundMessages(localMessageInfo.outboundP2PMessageIDs, dmOpID); try { // This code should never throw. return await promise; } catch (e) { invariant( localMessageInfo.outboundP2PMessageIDs, 'outboundP2PMessageIDs should be defined', ); return { result: 'failure', failedMessageIDs: localMessageInfo.outboundP2PMessageIDs, }; } } const outboundP2PMessages = await createMessagesToPeersFromDMOp( op, recipients, ); const spec = dmOpSpecs[op.type]; const notificationsCreationDataPromise: Promise = (async () => { if (!spec?.notificationsCreationData) { return null; } return await spec.notificationsCreationData(op, utilities); })(); const [{ rawMessageInfos, updateInfos }, notificationsCreationData] = await Promise.all([ dmOpSpecs[op.type].processDMOperation(op, utilities), notificationsCreationDataPromise, ]); - const newUpdateInfos = getThreadUpdatesForNewMessages( - rawMessageInfos, - updateInfos, - utilities, - ); - dispatchWithMetadata( { type: processDMOpsActionType, payload: { - rawMessageInfos: [], - updateInfos: newUpdateInfos, + rawMessageInfos, + updateInfos, outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, { dmOpID, }, ); try { // This code should never throw. return await promise; } catch (e) { return { result: 'failure', failedMessageIDs: outboundP2PMessages.map( message => message.messageID, ), }; } }, [ baseUtilities, getDMOpsSendingPromise, localMessageInfos, createMessagesToPeersFromDMOp, dispatchWithMetadata, processOutboundMessages, ], ); } export { useProcessDMOperation, useProcessAndSendDMOperation, useSendComposableDMOperation, };