diff --git a/keyserver/src/creators/update-creator.js b/keyserver/src/creators/update-creator.js index a02e2582a..3529f8d65 100644 --- a/keyserver/src/creators/update-creator.js +++ b/keyserver/src/creators/update-creator.js @@ -1,769 +1,661 @@ // @flow import invariant from 'invariant'; import { nonThreadCalendarFilters } from 'lib/selectors/calendar-filter-selectors.js'; import { keyForUpdateData, keyForUpdateInfo, rawUpdateInfoFromUpdateData, } from 'lib/shared/update-utils.js'; +import type { UpdateInfosRawData } from 'lib/shared/updates/update-spec.js'; import { updateSpecs } from 'lib/shared/updates/update-specs.js'; import { - type RawEntryInfos, - type FetchEntryInfosBase, type CalendarQuery, defaultCalendarQuery, } from 'lib/types/entry-types.js'; import { defaultNumberPerThread, - type FetchMessageInfosResult, type MessageSelectionCriteria, } from 'lib/types/message-types.js'; import { type UpdateTarget, redisMessageTypes, type NewUpdatesRedisMessage, } from 'lib/types/redis-types.js'; -import type { RawThreadInfo, RawThreadInfos } from 'lib/types/thread-types.js'; +import type { RawThreadInfo } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { type ServerUpdateInfo, type UpdateData, type RawUpdateInfo, type CreateUpdatesResult, } from 'lib/types/update-types.js'; -import type { UserInfos, LoggedInUserInfo } from 'lib/types/user-types.js'; +import type { UserInfos } from 'lib/types/user-types.js'; import { promiseAll } from 'lib/utils/promises.js'; import createIDs from './id-creator.js'; import { dbQuery, SQL, mergeAndConditions } from '../database/database.js'; import type { SQLStatementType } from '../database/types.js'; import { deleteUpdatesByConditions } from '../deleters/update-deleters.js'; import { fetchEntryInfos, fetchEntryInfosByID, } from '../fetchers/entry-fetchers.js'; import { fetchMessageInfos } from '../fetchers/message-fetchers.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchKnownUserInfos, fetchCurrentUserInfo, } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { channelNameForUpdateTarget, publisher } from '../socket/redis.js'; export type UpdatesForCurrentSession = // This is the default if no Viewer is passed, or if an isSocket Viewer is // passed in. We will broadcast to all valid sessions via Redis and return // nothing to the caller, relying on the current session's Redis listener to // pick up the updates and deliver them asynchronously. | 'broadcast' // This is the default if a non-isSocket Viewer is passed in. We avoid // broadcasting the update to the current session, and instead return the // update to the caller, who will handle delivering it to the client. | 'return' // This means we ignore any updates destined for the current session. // Presumably the caller knows what they are doing and has a different way of // communicating the relevant information to the client. | 'ignore'; type DeleteCondition = { +userID: string, +target: ?string, +types: 'all_types' | $ReadOnlySet, }; export type ViewerInfo = | { viewer: Viewer, calendarQuery?: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, } | { viewer: Viewer, calendarQuery: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, threadInfos: { +[id: string]: RawThreadInfo }, }; const defaultUpdateCreationResult = { viewerUpdates: [], userInfos: {} }; const sortFunction = ( a: UpdateData | ServerUpdateInfo, b: UpdateData | ServerUpdateInfo, ) => a.time - b.time; const deleteUpdatesBatchSize = 500; // Creates rows in the updates table based on the inputed updateDatas. Returns // UpdateInfos pertaining to the provided viewerInfo, as well as related // UserInfos. If no viewerInfo is provided, no UpdateInfos will be returned. And // the update row won't have an updater column, meaning no session will be // excluded from the update. async function createUpdates( updateDatas: $ReadOnlyArray, passedViewerInfo?: ?ViewerInfo, ): Promise { if (updateDatas.length === 0) { return defaultUpdateCreationResult; } // viewer.session will throw for a script Viewer let viewerInfo = passedViewerInfo; if ( viewerInfo && (viewerInfo.viewer.isScriptViewer || !viewerInfo.viewer.loggedIn) ) { viewerInfo = null; } const sortedUpdateDatas = [...updateDatas].sort(sortFunction); const filteredUpdateDatas: UpdateData[] = []; const keyedUpdateDatas: Map = new Map(); for (const updateData of sortedUpdateDatas) { const key = keyForUpdateData(updateData); if (!key) { filteredUpdateDatas.push(updateData); continue; } const conditionKey = `${updateData.userID}|${key}`; const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} has conditionKey ` + `${conditionKey} but no deleteCondition`, ); const curUpdateDatas = keyedUpdateDatas.get(conditionKey); if (!curUpdateDatas) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const filteredCurrent = curUpdateDatas.filter(curUpdateData => filterOnDeleteCondition(curUpdateData, deleteCondition), ); if (filteredCurrent.length === 0) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const isNewUpdateDataFiltered = !filteredCurrent.every(curUpdateData => { const curDeleteCondition = getDeleteCondition(curUpdateData); invariant( curDeleteCondition, `updateData of type ${curUpdateData.type} is in keyedUpdateDatas ` + "but doesn't have a deleteCondition", ); return filterOnDeleteCondition(updateData, curDeleteCondition); }); if (!isNewUpdateDataFiltered) { filteredCurrent.push(updateData); } keyedUpdateDatas.set(conditionKey, filteredCurrent); } for (const keyUpdateDatas of keyedUpdateDatas.values()) { filteredUpdateDatas.push(...keyUpdateDatas); } const ids = await createIDs('updates', filteredUpdateDatas.length); let updatesForCurrentSession = viewerInfo && viewerInfo.updatesForCurrentSession; if (!updatesForCurrentSession && viewerInfo) { updatesForCurrentSession = viewerInfo.viewer.isSocket ? 'broadcast' : 'return'; } else if (!updatesForCurrentSession) { updatesForCurrentSession = 'broadcast'; } const dontBroadcastSession = updatesForCurrentSession !== 'broadcast' && viewerInfo ? viewerInfo.viewer.session : null; const publishInfos: Map = new Map(); const viewerRawUpdateInfos: RawUpdateInfo[] = []; const insertRows: (?(number | string))[][] = []; const earliestTime: Map = new Map(); for (let i = 0; i < filteredUpdateDatas.length; i++) { const updateData = filteredUpdateDatas[i]; const target = getTargetFromUpdateData(updateData); const rawUpdateInfo = rawUpdateInfoFromUpdateData(updateData, ids[i]); if (!target || !dontBroadcastSession || target !== dontBroadcastSession) { const updateTarget = target ? { userID: updateData.userID, sessionID: target } : { userID: updateData.userID }; const channelName = channelNameForUpdateTarget(updateTarget); let publishInfo = publishInfos.get(channelName); if (!publishInfo) { publishInfo = { updateTarget, rawUpdateInfos: [] }; publishInfos.set(channelName, publishInfo); } publishInfo.rawUpdateInfos.push(rawUpdateInfo); } if ( updatesForCurrentSession === 'return' && viewerInfo && updateData.userID === viewerInfo.viewer.id && (!target || target === viewerInfo.viewer.session) ) { viewerRawUpdateInfos.push(rawUpdateInfo); } if (viewerInfo && target && viewerInfo.viewer.session === target) { // In the case where this update is being created only for the current // session, there's no reason to insert a row into the updates table continue; } const content = updateSpecs[updateData.type].updateContentForServerDB(updateData); const key = keyForUpdateData(updateData); if (key) { const conditionKey = `${updateData.userID}|${key}`; const currentEarliestTime = earliestTime.get(conditionKey); if (!currentEarliestTime || updateData.time < currentEarliestTime) { earliestTime.set(conditionKey, updateData.time); } } const insertRow = [ ids[i], updateData.userID, updateData.type, key, content, updateData.time, dontBroadcastSession, target, ]; insertRows.push(insertRow); } type DeleteUpdatesConditions = { key: string, target?: string, types?: number[], time?: number, }; const usersByConditions: Map< string, { conditions: DeleteUpdatesConditions, users: Set, }, > = new Map(); for (const [conditionKey, keyUpdateDatas] of keyedUpdateDatas) { const deleteConditionByTarget: Map = new Map(); for (const updateData of keyUpdateDatas) { const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} is in keyedUpdateDatas but ` + "doesn't have a deleteCondition", ); const { target, types } = deleteCondition; const existingDeleteCondition = deleteConditionByTarget.get(target); if (!existingDeleteCondition) { deleteConditionByTarget.set(target, deleteCondition); continue; } const existingTypes = existingDeleteCondition.types; if (existingTypes === 'all_types') { continue; } else if (types === 'all_types') { deleteConditionByTarget.set(target, deleteCondition); continue; } const mergedTypes = new Set([...types, ...existingTypes]); deleteConditionByTarget.set(target, { ...deleteCondition, types: mergedTypes, }); } for (const deleteCondition of deleteConditionByTarget.values()) { const { userID, target, types } = deleteCondition; const key = conditionKey.split('|')[1]; const conditions: DeleteUpdatesConditions = { key }; if (target) { conditions.target = target; } if (types !== 'all_types') { invariant(types.size > 0, 'deleteCondition had empty types set'); conditions.types = [...types]; } const earliestTimeForCondition = earliestTime.get(conditionKey); if (earliestTimeForCondition) { conditions.time = earliestTimeForCondition; } const conditionsKey = JSON.stringify(conditions); if (!usersByConditions.has(conditionsKey)) { usersByConditions.set(conditionsKey, { conditions, users: new Set(), }); } usersByConditions.get(conditionsKey)?.users.add(userID); } } const deleteSQLConditions: SQLStatementType[] = []; for (const { conditions, users } of usersByConditions.values()) { const sqlConditions = [ SQL`u.user IN (${[...users]})`, SQL`u.key = ${conditions.key}`, ]; if (conditions.target) { sqlConditions.push(SQL`u.target = ${conditions.target}`); } if (conditions.types) { sqlConditions.push(SQL`u.type IN (${conditions.types})`); } if (conditions.time) { sqlConditions.push(SQL`u.time < ${conditions.time}`); } deleteSQLConditions.push(mergeAndConditions(sqlConditions)); } const promises = {}; if (insertRows.length > 0) { const insertQuery = SQL` INSERT INTO updates(id, user, type, \`key\`, content, time, updater, target) `; insertQuery.append(SQL`VALUES ${insertRows}`); promises.insert = dbQuery(insertQuery); } if (publishInfos.size > 0) { promises.redis = redisPublish(publishInfos.values(), dontBroadcastSession); } if (deleteSQLConditions.length > 0) { promises.delete = (async () => { while (deleteSQLConditions.length > 0) { const batch = deleteSQLConditions.splice(0, deleteUpdatesBatchSize); await deleteUpdatesByConditions(batch); } })(); } if (viewerRawUpdateInfos.length > 0) { invariant(viewerInfo, 'should be set'); promises.updatesResult = fetchUpdateInfosWithRawUpdateInfos( viewerRawUpdateInfos, viewerInfo, ); } const { updatesResult } = await promiseAll(promises); if (!updatesResult) { return defaultUpdateCreationResult; } const { updateInfos, userInfos } = updatesResult; return { viewerUpdates: updateInfos, userInfos }; } export type FetchUpdatesResult = { +updateInfos: $ReadOnlyArray, +userInfos: UserInfos, }; async function fetchUpdateInfosWithRawUpdateInfos( rawUpdateInfos: $ReadOnlyArray, viewerInfo: ViewerInfo, ): Promise { const entitiesToFetch = rawUpdateInfos .map(info => updateSpecs[info.type].entitiesToFetch?.(info)) .filter(Boolean); const currentUserNeedsFetch = entitiesToFetch.some( ({ currentUser }) => currentUser, ); const threadIDsNeedingFetch = viewerInfo.threadInfos ? new Set() : new Set(entitiesToFetch.map(({ threadID }) => threadID).filter(Boolean)); const entryIDsNeedingFetch = new Set( entitiesToFetch.map(({ entryID }) => entryID).filter(Boolean), ); // entries and messages const threadIDsNeedingDetailedFetch = new Set( entitiesToFetch .map(({ detailedThreadID }) => detailedThreadID) .filter(Boolean), ); const promises = {}; const { viewer } = viewerInfo; if (!viewerInfo.threadInfos && threadIDsNeedingFetch.size > 0) { promises.threadResult = fetchThreadInfos(viewer, { threadIDs: threadIDsNeedingFetch, }); } let calendarQuery: ?CalendarQuery = viewerInfo.calendarQuery ? viewerInfo.calendarQuery : null; if (!calendarQuery && viewer.hasSessionInfo) { // This should only ever happen for "legacy" clients who call in without // providing this information. These clients wouldn't know how to deal with // the corresponding UpdateInfos anyways, so no reason to be worried. calendarQuery = viewer.calendarQuery; } else if (!calendarQuery) { calendarQuery = defaultCalendarQuery(viewer.platform, viewer.timeZone); } if (threadIDsNeedingDetailedFetch.size > 0) { const threadCursors: { [string]: ?string } = {}; for (const threadID of threadIDsNeedingDetailedFetch) { threadCursors[threadID] = null; } const messageSelectionCriteria: MessageSelectionCriteria = { threadCursors, }; promises.messageInfosResult = fetchMessageInfos( viewer, messageSelectionCriteria, defaultNumberPerThread, ); const threadCalendarQuery = { ...calendarQuery, filters: [ ...nonThreadCalendarFilters(calendarQuery.filters), { type: 'threads', threadIDs: [...threadIDsNeedingDetailedFetch] }, ], }; promises.calendarResult = fetchEntryInfos(viewer, [threadCalendarQuery]); } if (entryIDsNeedingFetch.size > 0) { promises.entryInfosResult = fetchEntryInfosByID( viewer, entryIDsNeedingFetch, ); } if (currentUserNeedsFetch) { promises.currentUserInfoResult = (async () => { const currentUserInfo = await fetchCurrentUserInfo(viewer); invariant(currentUserInfo.anonymous === undefined, 'should be logged in'); return currentUserInfo; })(); } const { threadResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, } = await promiseAll(promises); let threadInfos = {}; if (viewerInfo.threadInfos) { threadInfos = viewerInfo.threadInfos; } else if (threadResult) { threadInfos = threadResult.threadInfos; } return await updateInfosFromRawUpdateInfos(viewer, rawUpdateInfos, { threadInfos, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, }); } -export type UpdateInfosRawData = { - threadInfos: RawThreadInfos, - messageInfosResult: ?FetchMessageInfosResult, - calendarResult: ?FetchEntryInfosBase, - entryInfosResult: ?RawEntryInfos, - currentUserInfoResult: LoggedInUserInfo, -}; async function updateInfosFromRawUpdateInfos( viewer: Viewer, rawUpdateInfos: $ReadOnlyArray, rawData: UpdateInfosRawData, ): Promise { - const { - threadInfos, - messageInfosResult, - calendarResult, - entryInfosResult, - currentUserInfoResult, - } = rawData; - const updateInfos = []; - const userIDsToFetch = new Set(); + const { messageInfosResult, calendarResult } = rawData; const rawEntryInfosByThreadID = {}; for (const entryInfo of calendarResult?.rawEntryInfos ?? []) { if (!rawEntryInfosByThreadID[entryInfo.threadID]) { rawEntryInfosByThreadID[entryInfo.threadID] = []; } rawEntryInfosByThreadID[entryInfo.threadID].push(entryInfo); } const rawMessageInfosByThreadID = {}; for (const messageInfo of messageInfosResult?.rawMessageInfos ?? []) { if (!rawMessageInfosByThreadID[messageInfo.threadID]) { rawMessageInfosByThreadID[messageInfo.threadID] = []; } rawMessageInfosByThreadID[messageInfo.threadID].push(messageInfo); } - for (const rawUpdateInfo of rawUpdateInfos) { - if (rawUpdateInfo.type === updateTypes.DELETE_ACCOUNT) { - updateInfos.push({ - type: updateTypes.DELETE_ACCOUNT, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - deletedUserID: rawUpdateInfo.deletedUserID, - }); - } else if (rawUpdateInfo.type === updateTypes.UPDATE_THREAD) { - const threadInfo = threadInfos[rawUpdateInfo.threadID]; - if (!threadInfo) { - console.warn( - "failed to hydrate updateTypes.UPDATE_THREAD because we couldn't " + - `fetch RawThreadInfo for ${rawUpdateInfo.threadID}`, - ); - continue; - } - updateInfos.push({ - type: updateTypes.UPDATE_THREAD, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - threadInfo, - }); - } else if (rawUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS) { - updateInfos.push({ - type: updateTypes.UPDATE_THREAD_READ_STATUS, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - threadID: rawUpdateInfo.threadID, - unread: rawUpdateInfo.unread, - }); - } else if (rawUpdateInfo.type === updateTypes.DELETE_THREAD) { - updateInfos.push({ - type: updateTypes.DELETE_THREAD, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - threadID: rawUpdateInfo.threadID, - }); - } else if (rawUpdateInfo.type === updateTypes.JOIN_THREAD) { - const threadInfo = threadInfos[rawUpdateInfo.threadID]; - if (!threadInfo) { - console.warn( - "failed to hydrate updateTypes.JOIN_THREAD because we couldn't " + - `fetch RawThreadInfo for ${rawUpdateInfo.threadID}`, - ); - continue; - } - - invariant(calendarResult, 'should be set'); - const rawEntryInfos = - rawEntryInfosByThreadID[rawUpdateInfo.threadID] ?? []; - invariant(messageInfosResult, 'should be set'); - const rawMessageInfos = - rawMessageInfosByThreadID[rawUpdateInfo.threadID] ?? []; - - updateInfos.push({ - type: updateTypes.JOIN_THREAD, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - threadInfo, - rawMessageInfos, - truncationStatus: - messageInfosResult.truncationStatuses[rawUpdateInfo.threadID], - rawEntryInfos, - }); - } else if (rawUpdateInfo.type === updateTypes.BAD_DEVICE_TOKEN) { - updateInfos.push({ - type: updateTypes.BAD_DEVICE_TOKEN, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - deviceToken: rawUpdateInfo.deviceToken, - }); - } else if (rawUpdateInfo.type === updateTypes.UPDATE_ENTRY) { - invariant(entryInfosResult, 'should be set'); - const entryInfo = entryInfosResult[rawUpdateInfo.entryID]; - if (!entryInfo) { - console.warn( - "failed to hydrate updateTypes.UPDATE_ENTRY because we couldn't " + - `fetch RawEntryInfo for ${rawUpdateInfo.entryID}`, - ); - continue; - } - updateInfos.push({ - type: updateTypes.UPDATE_ENTRY, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - entryInfo, - }); - } else if (rawUpdateInfo.type === updateTypes.UPDATE_CURRENT_USER) { - invariant(currentUserInfoResult, 'should be set'); - updateInfos.push({ - type: updateTypes.UPDATE_CURRENT_USER, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - currentUserInfo: currentUserInfoResult, - }); - } else if (rawUpdateInfo.type === updateTypes.UPDATE_USER) { - updateInfos.push({ - type: updateTypes.UPDATE_USER, - id: rawUpdateInfo.id, - time: rawUpdateInfo.time, - updatedUserID: rawUpdateInfo.updatedUserID, - }); - userIDsToFetch.add(rawUpdateInfo.updatedUserID); - } else { - invariant(false, `unrecognized updateType ${rawUpdateInfo.type}`); - } - } + const userIDsToFetch = new Set( + rawUpdateInfos + .map(update => + update.type === updateTypes.UPDATE_USER ? update.updatedUserID : null, + ) + .filter(Boolean), + ); + const params = { + data: rawData, + rawEntryInfosByThreadID, + rawMessageInfosByThreadID, + }; + const updateInfos = rawUpdateInfos + .map(update => + updateSpecs[update.type].updateInfoFromRawInfo(update, params), + ) + .filter(Boolean); let userInfos = {}; if (userIDsToFetch.size > 0) { userInfos = await fetchKnownUserInfos(viewer, [...userIDsToFetch]); } updateInfos.sort(sortFunction); // Now we'll attempt to merge UpdateInfos so that we only have one per key const updateForKey: Map = new Map(); const mergedUpdates: ServerUpdateInfo[] = []; for (const updateInfo of updateInfos) { const key = keyForUpdateInfo(updateInfo); if (!key) { mergedUpdates.push(updateInfo); continue; } else if ( updateInfo.type === updateTypes.DELETE_THREAD || updateInfo.type === updateTypes.JOIN_THREAD || updateInfo.type === updateTypes.DELETE_ACCOUNT ) { updateForKey.set(key, updateInfo); continue; } const currentUpdateInfo = updateForKey.get(key); if (!currentUpdateInfo) { updateForKey.set(key, updateInfo); } else if ( updateInfo.type === updateTypes.UPDATE_THREAD && currentUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS ) { // UPDATE_THREAD trumps UPDATE_THREAD_READ_STATUS // Note that we keep the oldest UPDATE_THREAD updateForKey.set(key, updateInfo); } else if ( updateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS && currentUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS ) { // If we only have UPDATE_THREAD_READ_STATUS, keep the most recent updateForKey.set(key, updateInfo); } else if (updateInfo.type === updateTypes.UPDATE_ENTRY) { updateForKey.set(key, updateInfo); } else if (updateInfo.type === updateTypes.UPDATE_CURRENT_USER) { updateForKey.set(key, updateInfo); } } for (const [, updateInfo] of updateForKey) { mergedUpdates.push(updateInfo); } mergedUpdates.sort(sortFunction); return { updateInfos: mergedUpdates, userInfos }; } type PublishInfo = { updateTarget: UpdateTarget, rawUpdateInfos: RawUpdateInfo[], }; async function redisPublish( publishInfos: Iterator, dontBroadcastSession: ?string, ): Promise { for (const publishInfo of publishInfos) { const { updateTarget, rawUpdateInfos } = publishInfo; const redisMessage: NewUpdatesRedisMessage = { type: redisMessageTypes.NEW_UPDATES, updates: rawUpdateInfos, }; if (!updateTarget.sessionID && dontBroadcastSession) { redisMessage.ignoreSession = dontBroadcastSession; } publisher.sendMessage(updateTarget, redisMessage); } } function getTargetFromUpdateData(updateData: UpdateData): ?string { if (updateData.targetSession) { return updateData.targetSession; } else if (updateData.targetCookie) { return updateData.targetCookie; } else { return null; } } function getDeleteCondition(updateData: UpdateData): ?DeleteCondition { let types; if (updateData.type === updateTypes.DELETE_ACCOUNT) { types = new Set([updateTypes.DELETE_ACCOUNT, updateTypes.UPDATE_USER]); } else if (updateData.type === updateTypes.UPDATE_THREAD) { types = new Set([ updateTypes.UPDATE_THREAD, updateTypes.UPDATE_THREAD_READ_STATUS, ]); } else if (updateData.type === updateTypes.UPDATE_THREAD_READ_STATUS) { types = new Set([updateTypes.UPDATE_THREAD_READ_STATUS]); } else if ( updateData.type === updateTypes.DELETE_THREAD || updateData.type === updateTypes.JOIN_THREAD ) { types = 'all_types'; } else if (updateData.type === updateTypes.UPDATE_ENTRY) { types = 'all_types'; } else if (updateData.type === updateTypes.UPDATE_CURRENT_USER) { types = new Set([updateTypes.UPDATE_CURRENT_USER]); } else if (updateData.type === updateTypes.UPDATE_USER) { types = new Set([updateTypes.UPDATE_USER]); } else { return null; } const target = getTargetFromUpdateData(updateData); const { userID } = updateData; return { userID, target, types }; } function filterOnDeleteCondition( updateData: UpdateData, deleteCondition: DeleteCondition, ): boolean { invariant( updateData.userID === deleteCondition.userID, `updateData of type ${updateData.type} being compared to wrong userID`, ); if (deleteCondition.target) { const target = getTargetFromUpdateData(updateData); if (target !== deleteCondition.target) { return true; } } if (deleteCondition.types === 'all_types') { return false; } return !deleteCondition.types.has(updateData.type); } export { createUpdates, fetchUpdateInfosWithRawUpdateInfos }; diff --git a/lib/shared/updates/bad-device-token-spec.js b/lib/shared/updates/bad-device-token-spec.js index c429f512a..8526b1246 100644 --- a/lib/shared/updates/bad-device-token-spec.js +++ b/lib/shared/updates/bad-device-token-spec.js @@ -1,29 +1,37 @@ // @flow import type { UpdateSpec } from './update-spec.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { BadDeviceTokenRawUpdateInfo, BadDeviceTokenUpdateData, BadDeviceTokenUpdateInfo, } from '../../types/update-types.js'; export const badDeviceTokenSpec: UpdateSpec< BadDeviceTokenUpdateInfo, BadDeviceTokenRawUpdateInfo, BadDeviceTokenUpdateData, > = Object.freeze({ rawUpdateInfoFromRow(row: Object) { const { deviceToken } = JSON.parse(row.content); return { type: updateTypes.BAD_DEVICE_TOKEN, id: row.id.toString(), time: row.time, deviceToken, }; }, updateContentForServerDB(data: BadDeviceTokenUpdateData) { const { deviceToken } = data; return JSON.stringify({ deviceToken }); }, + updateInfoFromRawInfo(info: BadDeviceTokenRawUpdateInfo) { + return { + type: updateTypes.BAD_DEVICE_TOKEN, + id: info.id, + time: info.time, + deviceToken: info.deviceToken, + }; + }, }); diff --git a/lib/shared/updates/delete-account-spec.js b/lib/shared/updates/delete-account-spec.js index d2c235b25..b25388cdb 100644 --- a/lib/shared/updates/delete-account-spec.js +++ b/lib/shared/updates/delete-account-spec.js @@ -1,64 +1,72 @@ // @flow import type { UpdateSpec } from './update-spec.js'; import type { RawThreadInfos } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { AccountDeletionRawUpdateInfo, AccountDeletionUpdateData, AccountDeletionUpdateInfo, } from '../../types/update-types.js'; import type { UserInfos } from '../../types/user-types.js'; export const deleteAccountSpec: UpdateSpec< AccountDeletionUpdateInfo, AccountDeletionRawUpdateInfo, AccountDeletionUpdateData, > = Object.freeze({ generateOpsForThreadUpdates( storeThreadInfos: RawThreadInfos, update: AccountDeletionUpdateInfo, ) { const operations = []; for (const threadID in storeThreadInfos) { const threadInfo = storeThreadInfos[threadID]; const newMembers = threadInfo.members.filter( member => member.id !== update.deletedUserID, ); if (newMembers.length < threadInfo.members.length) { const updatedThread = { ...threadInfo, members: newMembers, }; operations.push({ type: 'replace', payload: { id: threadID, threadInfo: updatedThread, }, }); } } return operations; }, reduceUserInfos(state: UserInfos, update: AccountDeletionUpdateInfo) { const { deletedUserID } = update; if (!state[deletedUserID]) { return state; } const { [deletedUserID]: deleted, ...rest } = state; return rest; }, rawUpdateInfoFromRow(row: Object) { const content = JSON.parse(row.content); return { type: updateTypes.DELETE_ACCOUNT, id: row.id.toString(), time: row.time, deletedUserID: content.deletedUserID, }; }, updateContentForServerDB(data: AccountDeletionUpdateData) { return JSON.stringify({ deletedUserID: data.deletedUserID }); }, + updateInfoFromRawInfo(info: AccountDeletionRawUpdateInfo) { + return { + type: updateTypes.DELETE_ACCOUNT, + id: info.id, + time: info.time, + deletedUserID: info.deletedUserID, + }; + }, }); diff --git a/lib/shared/updates/delete-thread-spec.js b/lib/shared/updates/delete-thread-spec.js index e99d7efcc..202e2fa48 100644 --- a/lib/shared/updates/delete-thread-spec.js +++ b/lib/shared/updates/delete-thread-spec.js @@ -1,55 +1,63 @@ // @flow import type { UpdateSpec } from './update-spec.js'; import type { RawThreadInfos } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ThreadDeletionRawUpdateInfo, ThreadDeletionUpdateData, ThreadDeletionUpdateInfo, } from '../../types/update-types.js'; export const deleteThreadSpec: UpdateSpec< ThreadDeletionUpdateInfo, ThreadDeletionRawUpdateInfo, ThreadDeletionUpdateData, > = Object.freeze({ generateOpsForThreadUpdates( storeThreadInfos: RawThreadInfos, update: ThreadDeletionUpdateInfo, ) { if (storeThreadInfos[update.threadID]) { return [ { type: 'remove', payload: { ids: [update.threadID], }, }, ]; } return null; }, reduceCalendarThreadFilters( filteredThreadIDs: $ReadOnlySet, update: ThreadDeletionUpdateInfo, ) { if (!filteredThreadIDs.has(update.threadID)) { return filteredThreadIDs; } return new Set([...filteredThreadIDs].filter(id => id !== update.threadID)); }, rawUpdateInfoFromRow(row: Object) { const { threadID } = JSON.parse(row.content); return { type: updateTypes.DELETE_THREAD, id: row.id.toString(), time: row.time, threadID, }; }, updateContentForServerDB(data: ThreadDeletionUpdateData) { const { threadID } = data; return JSON.stringify({ threadID }); }, + updateInfoFromRawInfo(info: ThreadDeletionRawUpdateInfo) { + return { + type: updateTypes.DELETE_THREAD, + id: info.id, + time: info.time, + threadID: info.threadID, + }; + }, }); diff --git a/lib/shared/updates/join-thread-spec.js b/lib/shared/updates/join-thread-spec.js index 25695ad2a..bae49db12 100644 --- a/lib/shared/updates/join-thread-spec.js +++ b/lib/shared/updates/join-thread-spec.js @@ -1,111 +1,142 @@ // @flow +import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; -import type { UpdateSpec } from './update-spec.js'; +import type { UpdateInfoFromRawInfoParams, UpdateSpec } from './update-spec.js'; import type { RawEntryInfo } from '../../types/entry-types.js'; import type { RawMessageInfo, MessageTruncationStatuses, } from '../../types/message-types.js'; import type { RawThreadInfos } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ThreadJoinUpdateInfo, ThreadJoinRawUpdateInfo, ThreadJoinUpdateData, } from '../../types/update-types.js'; import { combineTruncationStatuses } from '../message-utils.js'; import { threadInFilterList } from '../thread-utils.js'; export const joinThreadSpec: UpdateSpec< ThreadJoinUpdateInfo, ThreadJoinRawUpdateInfo, ThreadJoinUpdateData, > = Object.freeze({ generateOpsForThreadUpdates( storeThreadInfos: RawThreadInfos, update: ThreadJoinUpdateInfo, ) { if (_isEqual(storeThreadInfos[update.threadInfo.id])(update.threadInfo)) { return null; } return [ { type: 'replace', payload: { id: update.threadInfo.id, threadInfo: update.threadInfo, }, }, ]; }, mergeEntryInfos( entryIDs: Set, mergedEntryInfos: Array, update: ThreadJoinUpdateInfo, ) { for (const entryInfo of update.rawEntryInfos) { const entryID = entryInfo.id; if (!entryID || entryIDs.has(entryID)) { continue; } mergedEntryInfos.push(entryInfo); entryIDs.add(entryID); } }, reduceCalendarThreadFilters( filteredThreadIDs: $ReadOnlySet, update: ThreadJoinUpdateInfo, ) { if ( !threadInFilterList(update.threadInfo) || filteredThreadIDs.has(update.threadInfo.id) ) { return filteredThreadIDs; } return new Set([...filteredThreadIDs, update.threadInfo.id]); }, getRawMessageInfos(update: ThreadJoinUpdateInfo) { return update.rawMessageInfos; }, mergeMessageInfosAndTruncationStatuses( messageIDs: Set, messageInfos: Array, truncationStatuses: MessageTruncationStatuses, update: ThreadJoinUpdateInfo, ) { for (const messageInfo of update.rawMessageInfos) { const messageID = messageInfo.id; if (!messageID || messageIDs.has(messageID)) { continue; } messageInfos.push(messageInfo); messageIDs.add(messageID); } truncationStatuses[update.threadInfo.id] = combineTruncationStatuses( update.truncationStatus, truncationStatuses[update.threadInfo.id], ); }, rawUpdateInfoFromRow(row: Object) { const { threadID } = JSON.parse(row.content); return { type: updateTypes.JOIN_THREAD, id: row.id.toString(), time: row.time, threadID, }; }, updateContentForServerDB(data: ThreadJoinUpdateData) { const { threadID } = data; return JSON.stringify({ threadID }); }, entitiesToFetch(update: ThreadJoinRawUpdateInfo) { return { threadID: update.threadID, detailedThreadID: update.threadID, }; }, + updateInfoFromRawInfo( + info: ThreadJoinRawUpdateInfo, + params: UpdateInfoFromRawInfoParams, + ) { + const { data, rawEntryInfosByThreadID, rawMessageInfosByThreadID } = params; + const { threadInfos, calendarResult, messageInfosResult } = data; + const threadInfo = threadInfos[info.threadID]; + if (!threadInfo) { + console.warn( + "failed to hydrate updateTypes.JOIN_THREAD because we couldn't " + + `fetch RawThreadInfo for ${info.threadID}`, + ); + return null; + } + + invariant(calendarResult, 'should be set'); + const rawEntryInfos = rawEntryInfosByThreadID[info.threadID] ?? []; + invariant(messageInfosResult, 'should be set'); + const rawMessageInfos = rawMessageInfosByThreadID[info.threadID] ?? []; + + return { + type: updateTypes.JOIN_THREAD, + id: info.id, + time: info.time, + threadInfo, + rawMessageInfos, + truncationStatus: messageInfosResult.truncationStatuses[info.threadID], + rawEntryInfos, + }; + }, }); diff --git a/lib/shared/updates/update-current-user-spec.js b/lib/shared/updates/update-current-user-spec.js index 60170f4bf..7c94fd6f0 100644 --- a/lib/shared/updates/update-current-user-spec.js +++ b/lib/shared/updates/update-current-user-spec.js @@ -1,41 +1,55 @@ // @flow +import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; -import type { UpdateSpec } from './update-spec.js'; +import type { UpdateInfoFromRawInfoParams, UpdateSpec } from './update-spec.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { CurrentUserUpdateInfo, CurrentUserRawUpdateInfo, CurrentUserUpdateData, } from '../../types/update-types.js'; import type { CurrentUserInfo } from '../../types/user-types.js'; export const updateCurrentUserSpec: UpdateSpec< CurrentUserUpdateInfo, CurrentUserRawUpdateInfo, CurrentUserUpdateData, > = Object.freeze({ reduceCurrentUser(state: ?CurrentUserInfo, update: CurrentUserUpdateInfo) { if (!_isEqual(update.currentUserInfo)(state)) { return update.currentUserInfo; } return state; }, rawUpdateInfoFromRow(row: Object) { return { type: updateTypes.UPDATE_CURRENT_USER, id: row.id.toString(), time: row.time, }; }, updateContentForServerDB() { // user column contains all the info we need to construct the UpdateInfo return null; }, entitiesToFetch() { return { currentUser: true, }; }, + updateInfoFromRawInfo( + info: CurrentUserRawUpdateInfo, + params: UpdateInfoFromRawInfoParams, + ) { + const { currentUserInfoResult } = params.data; + invariant(currentUserInfoResult, 'should be set'); + return { + type: updateTypes.UPDATE_CURRENT_USER, + id: info.id, + time: info.time, + currentUserInfo: currentUserInfoResult, + }; + }, }); diff --git a/lib/shared/updates/update-entry-spec.js b/lib/shared/updates/update-entry-spec.js index 076e3d3cb..7338d1106 100644 --- a/lib/shared/updates/update-entry-spec.js +++ b/lib/shared/updates/update-entry-spec.js @@ -1,48 +1,71 @@ // @flow -import type { UpdateSpec } from './update-spec.js'; +import invariant from 'invariant'; + +import type { UpdateInfoFromRawInfoParams, UpdateSpec } from './update-spec.js'; import type { RawEntryInfo } from '../../types/entry-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { EntryUpdateInfo, EntryRawUpdateInfo, EntryUpdateData, } from '../../types/update-types.js'; export const updateEntrySpec: UpdateSpec< EntryUpdateInfo, EntryRawUpdateInfo, EntryUpdateData, > = Object.freeze({ mergeEntryInfos( entryIDs: Set, mergedEntryInfos: Array, update: EntryUpdateInfo, ) { const { entryInfo } = update; const entryID = entryInfo.id; if (!entryID || entryIDs.has(entryID)) { return; } mergedEntryInfos.push(entryInfo); entryIDs.add(entryID); }, rawUpdateInfoFromRow(row: Object) { const { entryID } = JSON.parse(row.content); return { type: updateTypes.UPDATE_ENTRY, id: row.id.toString(), time: row.time, entryID, }; }, updateContentForServerDB(data: EntryUpdateData) { const { entryID } = data; return JSON.stringify({ entryID }); }, entitiesToFetch(update: EntryRawUpdateInfo) { return { entryID: update.entryID, }; }, + updateInfoFromRawInfo( + info: EntryRawUpdateInfo, + params: UpdateInfoFromRawInfoParams, + ) { + const { entryInfosResult } = params.data; + invariant(entryInfosResult, 'should be set'); + const entryInfo = entryInfosResult[info.entryID]; + if (!entryInfo) { + console.warn( + "failed to hydrate updateTypes.UPDATE_ENTRY because we couldn't " + + `fetch RawEntryInfo for ${info.entryID}`, + ); + return null; + } + return { + type: updateTypes.UPDATE_ENTRY, + id: info.id, + time: info.time, + entryInfo, + }; + }, }); diff --git a/lib/shared/updates/update-spec.js b/lib/shared/updates/update-spec.js index a2adeb712..f1b1dcfc6 100644 --- a/lib/shared/updates/update-spec.js +++ b/lib/shared/updates/update-spec.js @@ -1,55 +1,86 @@ // @flow import type { ThreadStoreOperation } from '../../ops/thread-store-ops.js'; -import type { RawEntryInfo } from '../../types/entry-types.js'; +import type { + FetchEntryInfosBase, + RawEntryInfo, + RawEntryInfos, +} from '../../types/entry-types.js'; import type { RawMessageInfo, MessageTruncationStatuses, + FetchMessageInfosResult, } from '../../types/message-types.js'; import type { RawThreadInfos } from '../../types/thread-types.js'; import type { ClientUpdateInfo, RawUpdateInfo, UpdateData, } from '../../types/update-types.js'; -import type { CurrentUserInfo, UserInfos } from '../../types/user-types.js'; +import type { + CurrentUserInfo, + LoggedInUserInfo, + UserInfos, +} from '../../types/user-types.js'; + +export type UpdateInfosRawData = { + +threadInfos: RawThreadInfos, + +messageInfosResult: ?FetchMessageInfosResult, + +calendarResult: ?FetchEntryInfosBase, + +entryInfosResult: ?RawEntryInfos, + +currentUserInfoResult: LoggedInUserInfo, +}; + +export type UpdateInfoFromRawInfoParams = { + +data: UpdateInfosRawData, + +rawEntryInfosByThreadID: { + +[id: string]: $ReadOnlyArray, + }, + +rawMessageInfosByThreadID: { + +[id: string]: $ReadOnlyArray, + }, +}; export type UpdateSpec< UpdateInfo: ClientUpdateInfo, RawInfo: RawUpdateInfo, Data: UpdateData, > = { +generateOpsForThreadUpdates?: ( storeThreadInfos: RawThreadInfos, update: UpdateInfo, ) => ?$ReadOnlyArray, +mergeEntryInfos?: ( entryIDs: Set, mergedEntryInfos: Array, update: UpdateInfo, ) => void, +reduceCurrentUser?: ( state: ?CurrentUserInfo, update: UpdateInfo, ) => ?CurrentUserInfo, +reduceUserInfos?: (state: UserInfos, update: UpdateInfo) => UserInfos, +reduceCalendarThreadFilters?: ( filteredThreadIDs: $ReadOnlySet, update: UpdateInfo, ) => $ReadOnlySet, +getRawMessageInfos?: (update: UpdateInfo) => $ReadOnlyArray, +mergeMessageInfosAndTruncationStatuses?: ( messageIDs: Set, messageInfos: Array, truncationStatuses: MessageTruncationStatuses, update: UpdateInfo, ) => void, +rawUpdateInfoFromRow: (row: Object) => RawInfo, +updateContentForServerDB: (data: Data) => ?string, +entitiesToFetch?: (update: RawInfo) => { +threadID?: string, +detailedThreadID?: string, +entryID?: string, +currentUser?: boolean, }, + +updateInfoFromRawInfo: ( + info: RawInfo, + params: UpdateInfoFromRawInfoParams, + ) => ?UpdateInfo, }; diff --git a/lib/shared/updates/update-thread-read-status-spec.js b/lib/shared/updates/update-thread-read-status-spec.js index e1426da73..bb27b8973 100644 --- a/lib/shared/updates/update-thread-read-status-spec.js +++ b/lib/shared/updates/update-thread-read-status-spec.js @@ -1,62 +1,71 @@ // @flow import type { UpdateSpec } from './update-spec.js'; import type { RawThreadInfo, RawThreadInfos, } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ThreadReadStatusUpdateInfo, ThreadReadStatusRawUpdateInfo, ThreadReadStatusUpdateData, } from '../../types/update-types.js'; export const updateThreadReadStatusSpec: UpdateSpec< ThreadReadStatusUpdateInfo, ThreadReadStatusRawUpdateInfo, ThreadReadStatusUpdateData, > = Object.freeze({ generateOpsForThreadUpdates( storeThreadInfos: RawThreadInfos, update: ThreadReadStatusUpdateInfo, ) { const storeThreadInfo: ?RawThreadInfo = storeThreadInfos[update.threadID]; if ( !storeThreadInfo || storeThreadInfo.currentUser.unread === update.unread ) { return null; } const updatedThread = { ...storeThreadInfo, currentUser: { ...storeThreadInfo.currentUser, unread: update.unread, }, }; return [ { type: 'replace', payload: { id: update.threadID, threadInfo: updatedThread, }, }, ]; }, rawUpdateInfoFromRow(row: Object) { const { threadID, unread } = JSON.parse(row.content); return { type: updateTypes.UPDATE_THREAD_READ_STATUS, id: row.id.toString(), time: row.time, threadID, unread, }; }, updateContentForServerDB(data: ThreadReadStatusUpdateData) { const { threadID, unread } = data; return JSON.stringify({ threadID, unread }); }, + updateInfoFromRawInfo(info: ThreadReadStatusRawUpdateInfo) { + return { + type: updateTypes.UPDATE_THREAD_READ_STATUS, + id: info.id, + time: info.time, + threadID: info.threadID, + unread: info.unread, + }; + }, }); diff --git a/lib/shared/updates/update-thread-spec.js b/lib/shared/updates/update-thread-spec.js index b354bc818..c188bef69 100644 --- a/lib/shared/updates/update-thread-spec.js +++ b/lib/shared/updates/update-thread-spec.js @@ -1,68 +1,87 @@ // @flow import _isEqual from 'lodash/fp/isEqual.js'; -import type { UpdateSpec } from './update-spec.js'; +import type { UpdateInfoFromRawInfoParams, UpdateSpec } from './update-spec.js'; import type { RawThreadInfos } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ThreadUpdateInfo, ThreadRawUpdateInfo, ThreadUpdateData, } from '../../types/update-types.js'; import { threadInFilterList } from '../thread-utils.js'; export const updateThreadSpec: UpdateSpec< ThreadUpdateInfo, ThreadRawUpdateInfo, ThreadUpdateData, > = Object.freeze({ generateOpsForThreadUpdates( storeThreadInfos: RawThreadInfos, update: ThreadUpdateInfo, ) { if (_isEqual(storeThreadInfos[update.threadInfo.id])(update.threadInfo)) { return null; } return [ { type: 'replace', payload: { id: update.threadInfo.id, threadInfo: update.threadInfo, }, }, ]; }, reduceCalendarThreadFilters( filteredThreadIDs: $ReadOnlySet, update: ThreadUpdateInfo, ) { if ( threadInFilterList(update.threadInfo) || !filteredThreadIDs.has(update.threadInfo.id) ) { return filteredThreadIDs; } return new Set( [...filteredThreadIDs].filter(id => id !== update.threadInfo.id), ); }, rawUpdateInfoFromRow(row: Object) { const { threadID } = JSON.parse(row.content); return { type: updateTypes.UPDATE_THREAD, id: row.id.toString(), time: row.time, threadID, }; }, updateContentForServerDB(data: ThreadUpdateData) { return JSON.stringify({ threadID: data.threadID }); }, entitiesToFetch(update: ThreadRawUpdateInfo) { return { threadID: update.threadID, }; }, + updateInfoFromRawInfo( + info: ThreadRawUpdateInfo, + params: UpdateInfoFromRawInfoParams, + ) { + const threadInfo = params.data.threadInfos[info.threadID]; + if (!threadInfo) { + console.warn( + "failed to hydrate updateTypes.UPDATE_THREAD because we couldn't " + + `fetch RawThreadInfo for ${info.threadID}`, + ); + return null; + } + return { + type: updateTypes.UPDATE_THREAD, + id: info.id, + time: info.time, + threadInfo, + }; + }, }); diff --git a/lib/shared/updates/update-user-spec.js b/lib/shared/updates/update-user-spec.js index 412a1effc..e34a58e6c 100644 --- a/lib/shared/updates/update-user-spec.js +++ b/lib/shared/updates/update-user-spec.js @@ -1,29 +1,37 @@ // @flow import type { UpdateSpec } from './update-spec.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { UserUpdateInfo, UserRawUpdateInfo, UserUpdateData, } from '../../types/update-types.js'; export const updateUserSpec: UpdateSpec< UserUpdateInfo, UserRawUpdateInfo, UserUpdateData, > = Object.freeze({ rawUpdateInfoFromRow(row: Object) { const content = JSON.parse(row.content); return { type: updateTypes.UPDATE_USER, id: row.id.toString(), time: row.time, updatedUserID: content.updatedUserID, }; }, updateContentForServerDB(data: UserUpdateData) { const { updatedUserID } = data; return JSON.stringify({ updatedUserID }); }, + updateInfoFromRawInfo(info: UserRawUpdateInfo) { + return { + type: updateTypes.UPDATE_USER, + id: info.id, + time: info.time, + updatedUserID: info.updatedUserID, + }; + }, });