diff --git a/lib/types/message-types.js b/lib/types/message-types.js index 1bf1b9033..6d79fd076 100644 --- a/lib/types/message-types.js +++ b/lib/types/message-types.js @@ -1,517 +1,517 @@ // @flow import invariant from 'invariant'; import type { FetchResultInfoInterface } from '../utils/fetch-json'; import { type ClientDBMediaInfo } from './media-types'; import type { AddMembersMessageData, AddMembersMessageInfo, RawAddMembersMessageInfo, } from './messages/add-members'; import type { ChangeRoleMessageData, ChangeRoleMessageInfo, RawChangeRoleMessageInfo, } from './messages/change-role'; import type { ChangeSettingsMessageData, ChangeSettingsMessageInfo, RawChangeSettingsMessageInfo, } from './messages/change-settings'; import type { CreateEntryMessageData, CreateEntryMessageInfo, RawCreateEntryMessageInfo, } from './messages/create-entry'; import type { CreateSidebarMessageData, CreateSidebarMessageInfo, RawCreateSidebarMessageInfo, } from './messages/create-sidebar'; import type { CreateSubthreadMessageData, CreateSubthreadMessageInfo, RawCreateSubthreadMessageInfo, } from './messages/create-subthread'; import type { CreateThreadMessageData, CreateThreadMessageInfo, RawCreateThreadMessageInfo, } from './messages/create-thread'; import type { DeleteEntryMessageData, DeleteEntryMessageInfo, RawDeleteEntryMessageInfo, } from './messages/delete-entry'; import type { EditEntryMessageData, EditEntryMessageInfo, RawEditEntryMessageInfo, } from './messages/edit-entry'; import type { ImagesMessageData, ImagesMessageInfo, RawImagesMessageInfo, } from './messages/images'; import type { JoinThreadMessageData, JoinThreadMessageInfo, RawJoinThreadMessageInfo, } from './messages/join-thread'; import type { LeaveThreadMessageData, LeaveThreadMessageInfo, RawLeaveThreadMessageInfo, } from './messages/leave-thread'; import type { MediaMessageData, MediaMessageInfo, RawMediaMessageInfo, } from './messages/media'; import type { RawRemoveMembersMessageInfo, RemoveMembersMessageData, RemoveMembersMessageInfo, } from './messages/remove-members'; import type { RawRestoreEntryMessageInfo, RestoreEntryMessageData, RestoreEntryMessageInfo, } from './messages/restore-entry'; import type { RawTextMessageInfo, TextMessageData, TextMessageInfo, } from './messages/text'; import type { RawUnsupportedMessageInfo, UnsupportedMessageInfo, } from './messages/unsupported'; import type { RawUpdateRelationshipMessageInfo, UpdateRelationshipMessageData, UpdateRelationshipMessageInfo, } from './messages/update-relationship'; import { type RelativeUserInfo, type UserInfos } from './user-types'; export const messageTypes = Object.freeze({ TEXT: 0, CREATE_THREAD: 1, ADD_MEMBERS: 2, CREATE_SUB_THREAD: 3, CHANGE_SETTINGS: 4, REMOVE_MEMBERS: 5, CHANGE_ROLE: 6, LEAVE_THREAD: 7, JOIN_THREAD: 8, CREATE_ENTRY: 9, EDIT_ENTRY: 10, DELETE_ENTRY: 11, RESTORE_ENTRY: 12, // When the server has a message to deliver that the client can't properly // render because the client is too old, the server will send this message // type instead. Consequently, there is no MessageData for UNSUPPORTED - just // a RawMessageInfo and a MessageInfo. Note that native/persist.js handles // converting these MessageInfos when the client is upgraded. UNSUPPORTED: 13, IMAGES: 14, MULTIMEDIA: 15, UPDATE_RELATIONSHIP: 16, SIDEBAR_SOURCE: 17, CREATE_SIDEBAR: 18, }); export type MessageType = $Values; export function assertMessageType(ourMessageType: number): MessageType { invariant( ourMessageType === 0 || ourMessageType === 1 || ourMessageType === 2 || ourMessageType === 3 || ourMessageType === 4 || ourMessageType === 5 || ourMessageType === 6 || ourMessageType === 7 || ourMessageType === 8 || ourMessageType === 9 || ourMessageType === 10 || ourMessageType === 11 || ourMessageType === 12 || ourMessageType === 13 || ourMessageType === 14 || ourMessageType === 15 || ourMessageType === 16 || ourMessageType === 17 || ourMessageType === 18, 'number is not MessageType enum', ); return ourMessageType; } const composableMessageTypes = new Set([ messageTypes.TEXT, messageTypes.IMAGES, messageTypes.MULTIMEDIA, ]); export function isComposableMessageType(ourMessageType: MessageType): boolean { return composableMessageTypes.has(ourMessageType); } export function assertComposableMessageType( ourMessageType: MessageType, ): MessageType { invariant( isComposableMessageType(ourMessageType), 'MessageType is not composed', ); return ourMessageType; } export function assertComposableRawMessage( message: RawMessageInfo, ): RawComposableMessageInfo { invariant( message.type === messageTypes.TEXT || message.type === messageTypes.IMAGES || message.type === messageTypes.MULTIMEDIA, 'Message is not composable', ); return message; } export function messageDataLocalID(messageData: MessageData): ?string { if ( messageData.type !== messageTypes.TEXT && messageData.type !== messageTypes.IMAGES && messageData.type !== messageTypes.MULTIMEDIA ) { return null; } return messageData.localID; } const mediaMessageTypes = new Set([ messageTypes.IMAGES, messageTypes.MULTIMEDIA, ]); export function isMediaMessageType(ourMessageType: MessageType): boolean { return mediaMessageTypes.has(ourMessageType); } export function assetMediaMessageType( ourMessageType: MessageType, ): MessageType { invariant(isMediaMessageType(ourMessageType), 'MessageType is not media'); return ourMessageType; } // *MessageData = passed to createMessages function to insert into database // Raw*MessageInfo = used by server, and contained in client's local store // *MessageInfo = used by client in UI code export type SidebarSourceMessageData = { +type: 17, +threadID: string, +creatorID: string, +time: number, +sourceMessage?: RawComposableMessageInfo | RawRobotextMessageInfo, }; export type MessageData = | TextMessageData | CreateThreadMessageData | AddMembersMessageData | CreateSubthreadMessageData | ChangeSettingsMessageData | RemoveMembersMessageData | ChangeRoleMessageData | LeaveThreadMessageData | JoinThreadMessageData | CreateEntryMessageData | EditEntryMessageData | DeleteEntryMessageData | RestoreEntryMessageData | ImagesMessageData | MediaMessageData | UpdateRelationshipMessageData | SidebarSourceMessageData | CreateSidebarMessageData; export type MultimediaMessageData = ImagesMessageData | MediaMessageData; export type RawMultimediaMessageInfo = | RawImagesMessageInfo | RawMediaMessageInfo; export type RawComposableMessageInfo = | RawTextMessageInfo | RawMultimediaMessageInfo; export type RawRobotextMessageInfo = | RawCreateThreadMessageInfo | RawAddMembersMessageInfo | RawCreateSubthreadMessageInfo | RawChangeSettingsMessageInfo | RawRemoveMembersMessageInfo | RawChangeRoleMessageInfo | RawLeaveThreadMessageInfo | RawJoinThreadMessageInfo | RawCreateEntryMessageInfo | RawEditEntryMessageInfo | RawDeleteEntryMessageInfo | RawRestoreEntryMessageInfo | RawUpdateRelationshipMessageInfo | RawCreateSidebarMessageInfo | RawUnsupportedMessageInfo; export type RawSidebarSourceMessageInfo = { ...SidebarSourceMessageData, id: string, }; export type RawMessageInfo = | RawComposableMessageInfo | RawRobotextMessageInfo | RawSidebarSourceMessageInfo; export type LocallyComposedMessageInfo = | ({ ...RawImagesMessageInfo, +localID: string, } & RawImagesMessageInfo) | ({ ...RawMediaMessageInfo, +localID: string, } & RawMediaMessageInfo) | ({ ...RawTextMessageInfo, +localID: string, } & RawTextMessageInfo); export type MultimediaMessageInfo = ImagesMessageInfo | MediaMessageInfo; export type ComposableMessageInfo = TextMessageInfo | MultimediaMessageInfo; export type RobotextMessageInfo = | CreateThreadMessageInfo | AddMembersMessageInfo | CreateSubthreadMessageInfo | ChangeSettingsMessageInfo | RemoveMembersMessageInfo | ChangeRoleMessageInfo | LeaveThreadMessageInfo | JoinThreadMessageInfo | CreateEntryMessageInfo | EditEntryMessageInfo | DeleteEntryMessageInfo | RestoreEntryMessageInfo | UnsupportedMessageInfo | UpdateRelationshipMessageInfo | CreateSidebarMessageInfo; export type PreviewableMessageInfo = | RobotextMessageInfo | MultimediaMessageInfo; export type SidebarSourceMessageInfo = { +type: 17, +id: string, +threadID: string, +creator: RelativeUserInfo, +time: number, +sourceMessage: ComposableMessageInfo | RobotextMessageInfo, }; export type MessageInfo = | ComposableMessageInfo | RobotextMessageInfo | SidebarSourceMessageInfo; export type ThreadMessageInfo = { messageIDs: string[], startReached: boolean, lastNavigatedTo: number, // millisecond timestamp lastPruned: number, // millisecond timestamp }; // Tracks client-local information about a message that hasn't been assigned an // ID by the server yet. As soon as the client gets an ack from the server for // this message, it will clear the LocalMessageInfo. export type LocalMessageInfo = { +sendFailed?: boolean, }; export type MessageStore = { +messages: { +[id: string]: RawMessageInfo }, +threads: { +[threadID: string]: ThreadMessageInfo }, +local: { +[id: string]: LocalMessageInfo }, +currentAsOf: number, }; export type RemoveMessageOperation = { +type: 'remove', +payload: { +ids: $ReadOnlyArray }, }; export type RemoveMessagesForThreadsOperation = { +type: 'remove_messages_for_threads', +payload: { +threadIDs: $ReadOnlyArray }, }; export type ReplaceMessageOperation = { +type: 'replace', +payload: { +id: string, +messageInfo: RawMessageInfo }, }; export type RekeyMessageOperation = { +type: 'rekey', +payload: { +from: string, +to: string }, }; export type RemoveAllMessagesOperation = { +type: 'remove_all', }; export type ClientDBMessageInfo = { +id: string, +local_id: ?string, +thread: string, +user: string, +type: string, +future_type: ?string, +content: ?string, +time: string, +media_infos: ?$ReadOnlyArray, }; export type ClientDBReplaceMessageOperation = { +type: 'replace', +payload: ClientDBMessageInfo, }; export type MessageStoreOperation = | RemoveMessageOperation | ReplaceMessageOperation | RekeyMessageOperation | RemoveMessagesForThreadsOperation | RemoveAllMessagesOperation; export type ClientDBMessageStoreOperation = | RemoveMessageOperation | ClientDBReplaceMessageOperation | RekeyMessageOperation | RemoveMessagesForThreadsOperation | RemoveAllMessagesOperation; export const messageTruncationStatus = Object.freeze({ // EXHAUSTIVE means we've reached the start of the thread. Either the result // set includes the very first message for that thread, or there is nothing // behind the cursor you queried for. Given that the client only ever issues // ranged queries whose range, when unioned with what is in state, represent // the set of all messages for a given thread, we can guarantee that getting // EXHAUSTIVE means the start has been reached. EXHAUSTIVE: 'exhaustive', // TRUNCATED is rare, and means that the server can't guarantee that the // result set for a given thread is contiguous with what the client has in its // state. If the client can't verify the contiguousness itself, it needs to // replace its Redux store's contents with what it is in this payload. // 1) getMessageInfosSince: Result set for thread is equal to max, and the // truncation status isn't EXHAUSTIVE (ie. doesn't include the very first // message). - // 2) getMessageInfos: ThreadSelectionCriteria does not specify cursors, the + // 2) getMessageInfos: MessageSelectionCriteria does not specify cursors, the // result set for thread is equal to max, and the truncation status isn't // EXHAUSTIVE. If cursors are specified, we never return truncated, since // the cursor given us guarantees the contiguousness of the result set. // Note that in the reducer, we can guarantee contiguousness if there is any // intersection between messageIDs in the result set and the set currently in // the Redux store. TRUNCATED: 'truncated', // UNCHANGED means the result set is guaranteed to be contiguous with what the // client has in its state, but is not EXHAUSTIVE. Basically, it's anything // that isn't either EXHAUSTIVE or TRUNCATED. UNCHANGED: 'unchanged', }); export type MessageTruncationStatus = $Values; export function assertMessageTruncationStatus( ourMessageTruncationStatus: string, ): MessageTruncationStatus { invariant( ourMessageTruncationStatus === 'truncated' || ourMessageTruncationStatus === 'unchanged' || ourMessageTruncationStatus === 'exhaustive', 'string is not ourMessageTruncationStatus enum', ); return ourMessageTruncationStatus; } export type MessageTruncationStatuses = { [threadID: string]: MessageTruncationStatus, }; export type ThreadCursors = { +[threadID: string]: ?string }; -export type ThreadSelectionCriteria = { +export type MessageSelectionCriteria = { +threadCursors?: ?ThreadCursors, +joinedThreads?: ?boolean, }; export type FetchMessageInfosRequest = { +cursors: ThreadCursors, +numberPerThread?: ?number, }; export type FetchMessageInfosResponse = { +rawMessageInfos: $ReadOnlyArray, +truncationStatuses: MessageTruncationStatuses, +userInfos: UserInfos, }; export type FetchMessageInfosResult = { +rawMessageInfos: $ReadOnlyArray, +truncationStatuses: MessageTruncationStatuses, }; export type FetchMessageInfosPayload = { +threadID: string, +rawMessageInfos: $ReadOnlyArray, +truncationStatus: MessageTruncationStatus, }; export type MessagesResponse = { +rawMessageInfos: $ReadOnlyArray, +truncationStatuses: MessageTruncationStatuses, +currentAsOf: number, }; export const defaultNumberPerThread = 20; export type SendMessageResponse = { +newMessageInfo: RawMessageInfo, }; export type SendMessageResult = { +id: string, +time: number, +interface: FetchResultInfoInterface, }; export type SendMessagePayload = { +localID: string, +serverID: string, +threadID: string, +time: number, +interface: FetchResultInfoInterface, }; export type SendTextMessageRequest = { +threadID: string, +localID?: string, +text: string, }; export type SendMultimediaMessageRequest = { +threadID: string, +localID: string, +mediaIDs: $ReadOnlyArray, }; // Used for the message info included in log-in type actions export type GenericMessagesResult = { +messageInfos: RawMessageInfo[], +truncationStatus: MessageTruncationStatuses, +watchedIDsAtRequestTime: $ReadOnlyArray, +currentAsOf: number, }; export type SaveMessagesPayload = { +rawMessageInfos: $ReadOnlyArray, +updatesCurrentAsOf: number, }; export type NewMessagesPayload = { +messagesResult: MessagesResponse, }; export type MessageStorePrunePayload = { +threadIDs: $ReadOnlyArray, }; diff --git a/server/src/creators/update-creator.js b/server/src/creators/update-creator.js index a3ad73869..046af8817 100644 --- a/server/src/creators/update-creator.js +++ b/server/src/creators/update-creator.js @@ -1,794 +1,794 @@ // @flow import invariant from 'invariant'; import { nonThreadCalendarFilters } from 'lib/selectors/calendar-filter-selectors'; import { keyForUpdateData, keyForUpdateInfo, rawUpdateInfoFromUpdateData, } from 'lib/shared/update-utils'; import { type RawEntryInfo, type FetchEntryInfosBase, type CalendarQuery, defaultCalendarQuery, } from 'lib/types/entry-types'; import { defaultNumberPerThread, type FetchMessageInfosResult, } from 'lib/types/message-types'; import { type UpdateTarget, redisMessageTypes, type NewUpdatesRedisMessage, } from 'lib/types/redis-types'; import type { RawThreadInfo } from 'lib/types/thread-types'; import { type ServerUpdateInfo, type UpdateData, type RawUpdateInfo, type CreateUpdatesResult, updateTypes, } from 'lib/types/update-types'; import type { UserInfos, LoggedInUserInfo, OldLoggedInUserInfo, } from 'lib/types/user-types'; import { promiseAll } from 'lib/utils/promises'; import { dbQuery, SQL, mergeAndConditions } from '../database/database'; import type { SQLStatementType } from '../database/types'; import { deleteUpdatesByConditions } from '../deleters/update-deleters'; import { fetchEntryInfos, fetchEntryInfosByID, } from '../fetchers/entry-fetchers'; import { fetchMessageInfos } from '../fetchers/message-fetchers'; import { fetchThreadInfos, type FetchThreadInfosResult, } from '../fetchers/thread-fetchers'; import { fetchKnownUserInfos, fetchCurrentUserInfo, } from '../fetchers/user-fetchers'; import type { Viewer } from '../session/viewer'; import { channelNameForUpdateTarget, publisher } from '../socket/redis'; import createIDs from './id-creator'; export type UpdatesForCurrentSession = // This is the default if no Viewer is passed, or if an isSocket Viewer is // passed in. We will broadcast to all valid sessions via Redis and return // nothing to the caller, relying on the current session's Redis listener to // pick up the updates and deliver them asynchronously. | 'broadcast' // This is the default if a non-isSocket Viewer is passed in. We avoid // broadcasting the update to the current session, and instead return the // update to the caller, who will handle delivering it to the client. | 'return' // This means we ignore any updates destined for the current session. // Presumably the caller knows what they are doing and has a different way of // communicating the relevant information to the client. | 'ignore'; type DeleteCondition = { +userID: string, +target: ?string, +types: 'all_types' | $ReadOnlySet, }; export type ViewerInfo = | { viewer: Viewer, calendarQuery?: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, } | { viewer: Viewer, calendarQuery: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, threadInfos: { +[id: string]: RawThreadInfo }, }; const defaultUpdateCreationResult = { viewerUpdates: [], userInfos: {} }; const sortFunction = ( a: UpdateData | ServerUpdateInfo, b: UpdateData | ServerUpdateInfo, ) => a.time - b.time; const deleteUpdatesBatchSize = 500; // Creates rows in the updates table based on the inputed updateDatas. Returns // UpdateInfos pertaining to the provided viewerInfo, as well as related // UserInfos. If no viewerInfo is provided, no UpdateInfos will be returned. And // the update row won't have an updater column, meaning no session will be // excluded from the update. async function createUpdates( updateDatas: $ReadOnlyArray, passedViewerInfo?: ?ViewerInfo, ): Promise { if (updateDatas.length === 0) { return defaultUpdateCreationResult; } // viewer.session will throw for a script Viewer let viewerInfo = passedViewerInfo; if ( viewerInfo && (viewerInfo.viewer.isScriptViewer || !viewerInfo.viewer.loggedIn) ) { viewerInfo = null; } const sortedUpdateDatas = [...updateDatas].sort(sortFunction); const filteredUpdateDatas: UpdateData[] = []; const keyedUpdateDatas: Map = new Map(); for (const updateData of sortedUpdateDatas) { const key = keyForUpdateData(updateData); if (!key) { filteredUpdateDatas.push(updateData); continue; } const conditionKey = `${updateData.userID}|${key}`; const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} has conditionKey ` + `${conditionKey} but no deleteCondition`, ); const curUpdateDatas = keyedUpdateDatas.get(conditionKey); if (!curUpdateDatas) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const filteredCurrent = curUpdateDatas.filter(curUpdateData => filterOnDeleteCondition(curUpdateData, deleteCondition), ); if (filteredCurrent.length === 0) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const isNewUpdateDataFiltered = !filteredCurrent.every(curUpdateData => { const curDeleteCondition = getDeleteCondition(curUpdateData); invariant( curDeleteCondition, `updateData of type ${curUpdateData.type} is in keyedUpdateDatas ` + "but doesn't have a deleteCondition", ); return filterOnDeleteCondition(updateData, curDeleteCondition); }); if (!isNewUpdateDataFiltered) { filteredCurrent.push(updateData); } keyedUpdateDatas.set(conditionKey, filteredCurrent); } for (const keyUpdateDatas of keyedUpdateDatas.values()) { filteredUpdateDatas.push(...keyUpdateDatas); } const ids = await createIDs('updates', filteredUpdateDatas.length); let updatesForCurrentSession = viewerInfo && viewerInfo.updatesForCurrentSession; if (!updatesForCurrentSession && viewerInfo) { updatesForCurrentSession = viewerInfo.viewer.isSocket ? 'broadcast' : 'return'; } else if (!updatesForCurrentSession) { updatesForCurrentSession = 'broadcast'; } const dontBroadcastSession = updatesForCurrentSession !== 'broadcast' && viewerInfo ? viewerInfo.viewer.session : null; const publishInfos: Map = new Map(); const viewerRawUpdateInfos: RawUpdateInfo[] = []; const insertRows: (?(number | string))[][] = []; const earliestTime: Map = new Map(); for (let i = 0; i < filteredUpdateDatas.length; i++) { const updateData = filteredUpdateDatas[i]; let content; if (updateData.type === updateTypes.DELETE_ACCOUNT) { content = JSON.stringify({ deletedUserID: updateData.deletedUserID }); } else if (updateData.type === updateTypes.UPDATE_THREAD) { content = JSON.stringify({ threadID: updateData.threadID }); } else if (updateData.type === updateTypes.UPDATE_THREAD_READ_STATUS) { const { threadID, unread } = updateData; content = JSON.stringify({ threadID, unread }); } else if ( updateData.type === updateTypes.DELETE_THREAD || updateData.type === updateTypes.JOIN_THREAD ) { const { threadID } = updateData; content = JSON.stringify({ threadID }); } else if (updateData.type === updateTypes.BAD_DEVICE_TOKEN) { const { deviceToken } = updateData; content = JSON.stringify({ deviceToken }); } else if (updateData.type === updateTypes.UPDATE_ENTRY) { const { entryID } = updateData; content = JSON.stringify({ entryID }); } else if (updateData.type === updateTypes.UPDATE_CURRENT_USER) { // user column contains all the info we need to construct the UpdateInfo content = null; } else if (updateData.type === updateTypes.UPDATE_USER) { const { updatedUserID } = updateData; content = JSON.stringify({ updatedUserID }); } else { invariant(false, `unrecognized updateType ${updateData.type}`); } const target = getTargetFromUpdateData(updateData); const rawUpdateInfo = rawUpdateInfoFromUpdateData(updateData, ids[i]); if (!target || !dontBroadcastSession || target !== dontBroadcastSession) { const updateTarget = target ? { userID: updateData.userID, sessionID: target } : { userID: updateData.userID }; const channelName = channelNameForUpdateTarget(updateTarget); let publishInfo = publishInfos.get(channelName); if (!publishInfo) { publishInfo = { updateTarget, rawUpdateInfos: [] }; publishInfos.set(channelName, publishInfo); } publishInfo.rawUpdateInfos.push(rawUpdateInfo); } if ( updatesForCurrentSession === 'return' && viewerInfo && updateData.userID === viewerInfo.viewer.id && (!target || target === viewerInfo.viewer.session) ) { viewerRawUpdateInfos.push(rawUpdateInfo); } if (viewerInfo && target && viewerInfo.viewer.session === target) { // In the case where this update is being created only for the current // session, there's no reason to insert a row into the updates table continue; } const key = keyForUpdateData(updateData); if (key) { const conditionKey = `${updateData.userID}|${key}`; const currentEarliestTime = earliestTime.get(conditionKey); if (!currentEarliestTime || updateData.time < currentEarliestTime) { earliestTime.set(conditionKey, updateData.time); } } const insertRow = [ ids[i], updateData.userID, updateData.type, key, content, updateData.time, dontBroadcastSession, target, ]; insertRows.push(insertRow); } type DeleteUpdatesConditions = { key: string, target?: string, types?: number[], time?: number, }; const usersByConditions: Map< string, { conditions: DeleteUpdatesConditions, users: Set, }, > = new Map(); for (const [conditionKey, keyUpdateDatas] of keyedUpdateDatas) { const deleteConditionByTarget: Map = new Map(); for (const updateData of keyUpdateDatas) { const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} is in keyedUpdateDatas but ` + "doesn't have a deleteCondition", ); const { target, types } = deleteCondition; const existingDeleteCondition = deleteConditionByTarget.get(target); if (!existingDeleteCondition) { deleteConditionByTarget.set(target, deleteCondition); continue; } const existingTypes = existingDeleteCondition.types; if (existingTypes === 'all_types') { continue; } else if (types === 'all_types') { deleteConditionByTarget.set(target, deleteCondition); continue; } const mergedTypes = new Set([...types, ...existingTypes]); deleteConditionByTarget.set(target, { ...deleteCondition, types: mergedTypes, }); } for (const deleteCondition of deleteConditionByTarget.values()) { const { userID, target, types } = deleteCondition; const key = conditionKey.split('|')[1]; const conditions: DeleteUpdatesConditions = { key }; if (target) { conditions.target = target; } if (types !== 'all_types') { invariant(types.size > 0, 'deleteCondition had empty types set'); conditions.types = [...types]; } const earliestTimeForCondition = earliestTime.get(conditionKey); if (earliestTimeForCondition) { conditions.time = earliestTimeForCondition; } const conditionsKey = JSON.stringify(conditions); if (!usersByConditions.has(conditionsKey)) { usersByConditions.set(conditionsKey, { conditions, users: new Set(), }); } usersByConditions.get(conditionsKey)?.users.add(userID); } } const deleteSQLConditions: SQLStatementType[] = []; for (const { conditions, users } of usersByConditions.values()) { const sqlConditions = [ SQL`u.user IN (${[...users]})`, SQL`u.key = ${conditions.key}`, ]; if (conditions.target) { sqlConditions.push(SQL`u.target = ${conditions.target}`); } if (conditions.types) { sqlConditions.push(SQL`u.type IN (${conditions.types})`); } if (conditions.time) { sqlConditions.push(SQL`u.time < ${conditions.time}`); } deleteSQLConditions.push(mergeAndConditions(sqlConditions)); } const promises = {}; if (insertRows.length > 0) { const insertQuery = SQL` INSERT INTO updates(id, user, type, \`key\`, content, time, updater, target) `; insertQuery.append(SQL`VALUES ${insertRows}`); promises.insert = dbQuery(insertQuery); } if (publishInfos.size > 0) { promises.redis = redisPublish(publishInfos.values(), dontBroadcastSession); } if (deleteSQLConditions.length > 0) { promises.delete = (async () => { while (deleteSQLConditions.length > 0) { const batch = deleteSQLConditions.splice(0, deleteUpdatesBatchSize); await deleteUpdatesByConditions(batch); } })(); } if (viewerRawUpdateInfos.length > 0) { invariant(viewerInfo, 'should be set'); promises.updatesResult = fetchUpdateInfosWithRawUpdateInfos( viewerRawUpdateInfos, viewerInfo, ); } const { updatesResult } = await promiseAll(promises); if (!updatesResult) { return defaultUpdateCreationResult; } const { updateInfos, userInfos } = updatesResult; return { viewerUpdates: updateInfos, userInfos }; } export type FetchUpdatesResult = { +updateInfos: $ReadOnlyArray, +userInfos: UserInfos, }; async function fetchUpdateInfosWithRawUpdateInfos( rawUpdateInfos: $ReadOnlyArray, viewerInfo: ViewerInfo, ): Promise { const { viewer } = viewerInfo; const threadIDsNeedingFetch = new Set(); const entryIDsNeedingFetch = new Set(); let currentUserNeedsFetch = false; const threadIDsNeedingDetailedFetch = new Set(); // entries and messages for (const rawUpdateInfo of rawUpdateInfos) { if ( !viewerInfo.threadInfos && (rawUpdateInfo.type === updateTypes.UPDATE_THREAD || rawUpdateInfo.type === updateTypes.JOIN_THREAD) ) { threadIDsNeedingFetch.add(rawUpdateInfo.threadID); } if (rawUpdateInfo.type === updateTypes.JOIN_THREAD) { threadIDsNeedingDetailedFetch.add(rawUpdateInfo.threadID); } else if (rawUpdateInfo.type === updateTypes.UPDATE_ENTRY) { entryIDsNeedingFetch.add(rawUpdateInfo.entryID); } else if (rawUpdateInfo.type === updateTypes.UPDATE_CURRENT_USER) { currentUserNeedsFetch = true; } } const promises = {}; if (!viewerInfo.threadInfos && threadIDsNeedingFetch.size > 0) { promises.threadResult = fetchThreadInfos( viewer, SQL`t.id IN (${[...threadIDsNeedingFetch]})`, ); } let calendarQuery: ?CalendarQuery = viewerInfo.calendarQuery ? viewerInfo.calendarQuery : null; if (!calendarQuery && viewer.hasSessionInfo) { // This should only ever happen for "legacy" clients who call in without // providing this information. These clients wouldn't know how to deal with // the corresponding UpdateInfos anyways, so no reason to be worried. calendarQuery = viewer.calendarQuery; } else if (!calendarQuery) { calendarQuery = defaultCalendarQuery(viewer.platform, viewer.timeZone); } if (threadIDsNeedingDetailedFetch.size > 0) { - const threadSelectionCriteria = { threadCursors: {} }; + const messageSelectionCriteria = { threadCursors: {} }; for (const threadID of threadIDsNeedingDetailedFetch) { - threadSelectionCriteria.threadCursors[threadID] = false; + messageSelectionCriteria.threadCursors[threadID] = false; } promises.messageInfosResult = fetchMessageInfos( viewer, - threadSelectionCriteria, + messageSelectionCriteria, defaultNumberPerThread, ); const threadCalendarQuery = { ...calendarQuery, filters: [ ...nonThreadCalendarFilters(calendarQuery.filters), { type: 'threads', threadIDs: [...threadIDsNeedingDetailedFetch] }, ], }; promises.calendarResult = fetchEntryInfos(viewer, [threadCalendarQuery]); } if (entryIDsNeedingFetch.size > 0) { promises.entryInfosResult = fetchEntryInfosByID(viewer, [ ...entryIDsNeedingFetch, ]); } if (currentUserNeedsFetch) { promises.currentUserInfoResult = (async () => { const currentUserInfo = await fetchCurrentUserInfo(viewer); invariant(currentUserInfo.anonymous === undefined, 'should be logged in'); return currentUserInfo; })(); } const { threadResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, } = await promiseAll(promises); let threadInfosResult; if (viewerInfo.threadInfos) { const { threadInfos } = viewerInfo; threadInfosResult = { threadInfos }; } else if (threadResult) { threadInfosResult = threadResult; } else { threadInfosResult = { threadInfos: {} }; } return await updateInfosFromRawUpdateInfos(viewer, rawUpdateInfos, { threadInfosResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, }); } export type UpdateInfosRawData = { threadInfosResult: FetchThreadInfosResult, messageInfosResult: ?FetchMessageInfosResult, calendarResult: ?FetchEntryInfosBase, entryInfosResult: ?$ReadOnlyArray, currentUserInfoResult: ?OldLoggedInUserInfo | LoggedInUserInfo, }; async function updateInfosFromRawUpdateInfos( viewer: Viewer, rawUpdateInfos: $ReadOnlyArray, rawData: UpdateInfosRawData, ): Promise { const { threadInfosResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, } = rawData; const updateInfos = []; const userIDsToFetch = new Set(); for (const rawUpdateInfo of rawUpdateInfos) { if (rawUpdateInfo.type === updateTypes.DELETE_ACCOUNT) { updateInfos.push({ type: updateTypes.DELETE_ACCOUNT, id: rawUpdateInfo.id, time: rawUpdateInfo.time, deletedUserID: rawUpdateInfo.deletedUserID, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_THREAD) { const threadInfo = threadInfosResult.threadInfos[rawUpdateInfo.threadID]; if (!threadInfo) { console.warn( "failed to hydrate updateTypes.UPDATE_THREAD because we couldn't " + `fetch RawThreadInfo for ${rawUpdateInfo.threadID}`, ); continue; } updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadInfo, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS) { updateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadID: rawUpdateInfo.threadID, unread: rawUpdateInfo.unread, }); } else if (rawUpdateInfo.type === updateTypes.DELETE_THREAD) { updateInfos.push({ type: updateTypes.DELETE_THREAD, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadID: rawUpdateInfo.threadID, }); } else if (rawUpdateInfo.type === updateTypes.JOIN_THREAD) { const threadInfo = threadInfosResult.threadInfos[rawUpdateInfo.threadID]; if (!threadInfo) { console.warn( "failed to hydrate updateTypes.JOIN_THREAD because we couldn't " + `fetch RawThreadInfo for ${rawUpdateInfo.threadID}`, ); continue; } const rawEntryInfos = []; invariant(calendarResult, 'should be set'); for (const entryInfo of calendarResult.rawEntryInfos) { if (entryInfo.threadID === rawUpdateInfo.threadID) { rawEntryInfos.push(entryInfo); } } const rawMessageInfos = []; invariant(messageInfosResult, 'should be set'); for (const messageInfo of messageInfosResult.rawMessageInfos) { if (messageInfo.threadID === rawUpdateInfo.threadID) { rawMessageInfos.push(messageInfo); } } updateInfos.push({ type: updateTypes.JOIN_THREAD, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadInfo, rawMessageInfos, truncationStatus: messageInfosResult.truncationStatuses[rawUpdateInfo.threadID], rawEntryInfos, }); } else if (rawUpdateInfo.type === updateTypes.BAD_DEVICE_TOKEN) { updateInfos.push({ type: updateTypes.BAD_DEVICE_TOKEN, id: rawUpdateInfo.id, time: rawUpdateInfo.time, deviceToken: rawUpdateInfo.deviceToken, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_ENTRY) { invariant(entryInfosResult, 'should be set'); const entryInfo = entryInfosResult.find( candidate => candidate.id === rawUpdateInfo.entryID, ); if (!entryInfo) { console.warn( "failed to hydrate updateTypes.UPDATE_ENTRY because we couldn't " + `fetch RawEntryInfo for ${rawUpdateInfo.entryID}`, ); continue; } updateInfos.push({ type: updateTypes.UPDATE_ENTRY, id: rawUpdateInfo.id, time: rawUpdateInfo.time, entryInfo, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_CURRENT_USER) { invariant(currentUserInfoResult, 'should be set'); updateInfos.push({ type: updateTypes.UPDATE_CURRENT_USER, id: rawUpdateInfo.id, time: rawUpdateInfo.time, currentUserInfo: currentUserInfoResult, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_USER) { updateInfos.push({ type: updateTypes.UPDATE_USER, id: rawUpdateInfo.id, time: rawUpdateInfo.time, updatedUserID: rawUpdateInfo.updatedUserID, }); userIDsToFetch.add(rawUpdateInfo.updatedUserID); } else { invariant(false, `unrecognized updateType ${rawUpdateInfo.type}`); } } let userInfos = {}; if (userIDsToFetch.size > 0) { userInfos = await fetchKnownUserInfos(viewer, [...userIDsToFetch]); } updateInfos.sort(sortFunction); // Now we'll attempt to merge UpdateInfos so that we only have one per key const updateForKey: Map = new Map(); const mergedUpdates: ServerUpdateInfo[] = []; for (const updateInfo of updateInfos) { const key = keyForUpdateInfo(updateInfo); if (!key) { mergedUpdates.push(updateInfo); continue; } else if ( updateInfo.type === updateTypes.DELETE_THREAD || updateInfo.type === updateTypes.JOIN_THREAD || updateInfo.type === updateTypes.DELETE_ACCOUNT ) { updateForKey.set(key, updateInfo); continue; } const currentUpdateInfo = updateForKey.get(key); if (!currentUpdateInfo) { updateForKey.set(key, updateInfo); } else if ( updateInfo.type === updateTypes.UPDATE_THREAD && currentUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS ) { // UPDATE_THREAD trumps UPDATE_THREAD_READ_STATUS // Note that we keep the oldest UPDATE_THREAD updateForKey.set(key, updateInfo); } else if ( updateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS && currentUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS ) { // If we only have UPDATE_THREAD_READ_STATUS, keep the most recent updateForKey.set(key, updateInfo); } else if (updateInfo.type === updateTypes.UPDATE_ENTRY) { updateForKey.set(key, updateInfo); } else if (updateInfo.type === updateTypes.UPDATE_CURRENT_USER) { updateForKey.set(key, updateInfo); } } for (const [, updateInfo] of updateForKey) { mergedUpdates.push(updateInfo); } mergedUpdates.sort(sortFunction); return { updateInfos: mergedUpdates, userInfos }; } type PublishInfo = { updateTarget: UpdateTarget, rawUpdateInfos: RawUpdateInfo[], }; async function redisPublish( publishInfos: Iterator, dontBroadcastSession: ?string, ): Promise { for (const publishInfo of publishInfos) { const { updateTarget, rawUpdateInfos } = publishInfo; const redisMessage: NewUpdatesRedisMessage = { type: redisMessageTypes.NEW_UPDATES, updates: rawUpdateInfos, }; if (!updateTarget.sessionID && dontBroadcastSession) { redisMessage.ignoreSession = dontBroadcastSession; } publisher.sendMessage(updateTarget, redisMessage); } } function getTargetFromUpdateData(updateData: UpdateData): ?string { if (updateData.targetSession) { return updateData.targetSession; } else if (updateData.targetCookie) { return updateData.targetCookie; } else { return null; } } function getDeleteCondition(updateData: UpdateData): ?DeleteCondition { let types; if (updateData.type === updateTypes.DELETE_ACCOUNT) { types = new Set([updateTypes.DELETE_ACCOUNT, updateTypes.UPDATE_USER]); } else if (updateData.type === updateTypes.UPDATE_THREAD) { types = new Set([ updateTypes.UPDATE_THREAD, updateTypes.UPDATE_THREAD_READ_STATUS, ]); } else if (updateData.type === updateTypes.UPDATE_THREAD_READ_STATUS) { types = new Set([updateTypes.UPDATE_THREAD_READ_STATUS]); } else if ( updateData.type === updateTypes.DELETE_THREAD || updateData.type === updateTypes.JOIN_THREAD ) { types = 'all_types'; } else if (updateData.type === updateTypes.UPDATE_ENTRY) { types = 'all_types'; } else if (updateData.type === updateTypes.UPDATE_CURRENT_USER) { types = new Set([updateTypes.UPDATE_CURRENT_USER]); } else if (updateData.type === updateTypes.UPDATE_USER) { types = new Set([updateTypes.UPDATE_USER]); } else { return null; } const target = getTargetFromUpdateData(updateData); const { userID } = updateData; return { userID, target, types }; } function filterOnDeleteCondition( updateData: UpdateData, deleteCondition: DeleteCondition, ): boolean { invariant( updateData.userID === deleteCondition.userID, `updateData of type ${updateData.type} being compared to wrong userID`, ); if (deleteCondition.target) { const target = getTargetFromUpdateData(updateData); if (target !== deleteCondition.target) { return true; } } if (deleteCondition.types === 'all_types') { return false; } return !deleteCondition.types.has(updateData.type); } export { createUpdates, fetchUpdateInfosWithRawUpdateInfos }; diff --git a/server/src/fetchers/message-fetchers.js b/server/src/fetchers/message-fetchers.js index 579272cf3..fb9fad0af 100644 --- a/server/src/fetchers/message-fetchers.js +++ b/server/src/fetchers/message-fetchers.js @@ -1,611 +1,613 @@ // @flow import invariant from 'invariant'; import { sortMessageInfoList, shimUnsupportedRawMessageInfos, } from 'lib/shared/message-utils'; import { messageSpecs } from 'lib/shared/messages/message-specs'; import { notifCollapseKeyForRawMessageInfo } from 'lib/shared/notif-utils'; import { type RawMessageInfo, type RawComposableMessageInfo, type RawRobotextMessageInfo, messageTypes, type MessageType, assertMessageType, - type ThreadSelectionCriteria, + type MessageSelectionCriteria, type MessageTruncationStatus, messageTruncationStatus, type FetchMessageInfosResult, } from 'lib/types/message-types'; import { threadPermissions } from 'lib/types/thread-types'; import { ServerError } from 'lib/utils/errors'; import { dbQuery, SQL, mergeOrConditions } from '../database/database'; import type { PushInfo } from '../push/send'; import type { Viewer } from '../session/viewer'; import { creationString, localIDFromCreationString } from '../utils/idempotent'; import { mediaFromRow } from './upload-fetchers'; export type CollapsableNotifInfo = { collapseKey: ?string, existingMessageInfos: RawMessageInfo[], newMessageInfos: RawMessageInfo[], }; export type FetchCollapsableNotifsResult = { [userID: string]: CollapsableNotifInfo[], }; const visibleExtractString = `$.${threadPermissions.VISIBLE}.value`; // This function doesn't filter RawMessageInfos based on what messageTypes the // client supports, since each user can have multiple clients. The caller must // handle this filtering. async function fetchCollapsableNotifs( pushInfo: PushInfo, ): Promise { // First, we need to fetch any notifications that should be collapsed const usersToCollapseKeysToInfo = {}; const usersToCollapsableNotifInfo = {}; for (const userID in pushInfo) { usersToCollapseKeysToInfo[userID] = {}; usersToCollapsableNotifInfo[userID] = []; for (const rawMessageInfo of pushInfo[userID].messageInfos) { const collapseKey = notifCollapseKeyForRawMessageInfo(rawMessageInfo); if (!collapseKey) { const collapsableNotifInfo = { collapseKey, existingMessageInfos: [], newMessageInfos: [rawMessageInfo], }; usersToCollapsableNotifInfo[userID].push(collapsableNotifInfo); continue; } if (!usersToCollapseKeysToInfo[userID][collapseKey]) { usersToCollapseKeysToInfo[userID][collapseKey] = { collapseKey, existingMessageInfos: [], newMessageInfos: [], }; } usersToCollapseKeysToInfo[userID][collapseKey].newMessageInfos.push( rawMessageInfo, ); } } const sqlTuples = []; for (const userID in usersToCollapseKeysToInfo) { const collapseKeysToInfo = usersToCollapseKeysToInfo[userID]; for (const collapseKey in collapseKeysToInfo) { sqlTuples.push( SQL`(n.user = ${userID} AND n.collapse_key = ${collapseKey})`, ); } } if (sqlTuples.length === 0) { return usersToCollapsableNotifInfo; } const collapseQuery = SQL` SELECT m.id, m.thread AS threadID, m.content, m.time, m.type, m.user AS creatorID, stm.permissions AS subthread_permissions, n.user, n.collapse_key, up.id AS uploadID, up.type AS uploadType, up.secret AS uploadSecret, up.extra AS uploadExtra FROM notifications n LEFT JOIN messages m ON m.id = n.message LEFT JOIN uploads up ON m.type IN (${[messageTypes.IMAGES, messageTypes.MULTIMEDIA]}) AND JSON_CONTAINS(m.content, CAST(up.id as JSON), '$') LEFT JOIN memberships mm ON mm.thread = m.thread AND mm.user = n.user LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content AND stm.user = n.user WHERE n.rescinded = 0 AND JSON_EXTRACT(mm.permissions, ${visibleExtractString}) IS TRUE AND `; collapseQuery.append(mergeOrConditions(sqlTuples)); collapseQuery.append(SQL`ORDER BY m.time DESC`); const [collapseResult] = await dbQuery(collapseQuery); const rowsByUser = new Map(); for (const row of collapseResult) { const user = row.user.toString(); const currentRowsForUser = rowsByUser.get(user); if (currentRowsForUser) { currentRowsForUser.push(row); } else { rowsByUser.set(user, [row]); } } const derivedMessages = await fetchDerivedMessages(collapseResult); for (const userRows of rowsByUser.values()) { const messages = parseMessageSQLResult(userRows, derivedMessages); for (const message of messages) { const { rawMessageInfo, rows } = message; const [row] = rows; const info = usersToCollapseKeysToInfo[row.user][row.collapse_key]; info.existingMessageInfos.push(rawMessageInfo); } } for (const userID in usersToCollapseKeysToInfo) { const collapseKeysToInfo = usersToCollapseKeysToInfo[userID]; for (const collapseKey in collapseKeysToInfo) { const info = collapseKeysToInfo[collapseKey]; usersToCollapsableNotifInfo[userID].push({ collapseKey: info.collapseKey, existingMessageInfos: sortMessageInfoList(info.existingMessageInfos), newMessageInfos: sortMessageInfoList(info.newMessageInfos), }); } } return usersToCollapsableNotifInfo; } type MessageSQLResult = $ReadOnlyArray<{ rawMessageInfo: RawMessageInfo, rows: $ReadOnlyArray, }>; function parseMessageSQLResult( rows: $ReadOnlyArray, derivedMessages: $ReadOnlyMap< string, RawComposableMessageInfo | RawRobotextMessageInfo, >, viewer?: Viewer, ): MessageSQLResult { const rowsByID = new Map(); for (const row of rows) { const id = row.id.toString(); const currentRowsForID = rowsByID.get(id); if (currentRowsForID) { currentRowsForID.push(row); } else { rowsByID.set(id, [row]); } } const messages = []; for (const messageRows of rowsByID.values()) { const rawMessageInfo = rawMessageInfoFromRows( messageRows, viewer, derivedMessages, ); if (rawMessageInfo) { messages.push({ rawMessageInfo, rows: messageRows }); } } return messages; } function assertSingleRow(rows: $ReadOnlyArray): Object { if (rows.length === 0) { throw new Error('expected single row, but none present!'); } else if (rows.length !== 1) { const messageIDs = rows.map(row => row.id.toString()); console.warn( `expected single row, but there are multiple! ${messageIDs.join(', ')}`, ); } return rows[0]; } function mostRecentRowType(rows: $ReadOnlyArray): MessageType { if (rows.length === 0) { throw new Error('expected row, but none present!'); } return assertMessageType(rows[0].type); } function rawMessageInfoFromRows( rows: $ReadOnlyArray, viewer?: Viewer, derivedMessages: $ReadOnlyMap< string, RawComposableMessageInfo | RawRobotextMessageInfo, >, ): ?RawMessageInfo { const type = mostRecentRowType(rows); const messageSpec = messageSpecs[type]; if (type === messageTypes.IMAGES || type === messageTypes.MULTIMEDIA) { const media = rows.filter(row => row.uploadID).map(mediaFromRow); const [row] = rows; const localID = localIDFromCreationString(viewer, row.creation); invariant( messageSpec.rawMessageInfoFromServerDBRow, `multimedia message spec should have rawMessageInfoFromServerDBRow`, ); return messageSpec.rawMessageInfoFromServerDBRow(row, { media, derivedMessages, localID, }); } const row = assertSingleRow(rows); const localID = localIDFromCreationString(viewer, row.creation); invariant( messageSpec.rawMessageInfoFromServerDBRow, `message spec ${type} should have rawMessageInfoFromServerDBRow`, ); return messageSpec.rawMessageInfoFromServerDBRow(row, { derivedMessages, localID, }); } async function fetchMessageInfos( viewer: Viewer, - criteria: ThreadSelectionCriteria, + criteria: MessageSelectionCriteria, numberPerThread: number, ): Promise { - const threadSelectionClause = threadSelectionCriteriaToSQLClause(criteria); + const selectionClause = messageSelectionCriteriaToSQLClause(criteria); const truncationStatuses = {}; const viewerID = viewer.id; const query = SQL` SELECT * FROM ( SELECT x.id, x.content, x.time, x.type, x.user AS creatorID, x.creation, x.subthread_permissions, x.uploadID, x.uploadType, x.uploadSecret, x.uploadExtra, @num := if( @thread = x.thread, if(@message = x.id, @num, @num + 1), 1 ) AS number, @message := x.id AS messageID, @thread := x.thread AS threadID FROM (SELECT @num := 0, @thread := '', @message := '') init JOIN ( SELECT m.id, m.thread, m.user, m.content, m.time, m.type, m.creation, stm.permissions AS subthread_permissions, up.id AS uploadID, up.type AS uploadType, up.secret AS uploadSecret, up.extra AS uploadExtra FROM messages m LEFT JOIN uploads up ON m.type IN (${[messageTypes.IMAGES, messageTypes.MULTIMEDIA]}) AND JSON_CONTAINS(m.content, CAST(up.id as JSON), '$') LEFT JOIN memberships mm ON mm.thread = m.thread AND mm.user = ${viewerID} LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content AND stm.user = ${viewerID} WHERE JSON_EXTRACT(mm.permissions, ${visibleExtractString}) IS TRUE AND `; - query.append(threadSelectionClause); + query.append(selectionClause); query.append(SQL` ORDER BY m.thread, m.time DESC ) x ) y WHERE y.number <= ${numberPerThread} `); const [result] = await dbQuery(query); const derivedMessages = await fetchDerivedMessages(result, viewer); const messages = parseMessageSQLResult(result, derivedMessages, viewer); const rawMessageInfos = []; const threadToMessageCount = new Map(); for (const message of messages) { const { rawMessageInfo } = message; rawMessageInfos.push(rawMessageInfo); const { threadID } = rawMessageInfo; const currentCountValue = threadToMessageCount.get(threadID); const currentCount = currentCountValue ? currentCountValue : 0; threadToMessageCount.set(threadID, currentCount + 1); } for (const [threadID, messageCount] of threadToMessageCount) { // If there are fewer messages returned than the max for a given thread, // then our result set includes all messages in the query range for that // thread truncationStatuses[threadID] = messageCount < numberPerThread ? messageTruncationStatus.EXHAUSTIVE : messageTruncationStatus.TRUNCATED; } for (const rawMessageInfo of rawMessageInfos) { if (messageSpecs[rawMessageInfo.type].startsThread) { truncationStatuses[rawMessageInfo.threadID] = messageTruncationStatus.EXHAUSTIVE; } } for (const threadID in criteria.threadCursors) { const truncationStatus = truncationStatuses[threadID]; if (truncationStatus === null || truncationStatus === undefined) { // If nothing was returned for a thread that was explicitly queried for, // then our result set includes all messages in the query range for that // thread truncationStatuses[threadID] = messageTruncationStatus.EXHAUSTIVE; } else if (truncationStatus === messageTruncationStatus.TRUNCATED) { // If a cursor was specified for a given thread, then the result is // guaranteed to be contiguous with what the client has, and as such the // result should never be TRUNCATED truncationStatuses[threadID] = messageTruncationStatus.UNCHANGED; } } const shimmedRawMessageInfos = shimUnsupportedRawMessageInfos( rawMessageInfos, viewer.platformDetails, ); return { rawMessageInfos: shimmedRawMessageInfos, truncationStatuses, }; } -function threadSelectionCriteriaToSQLClause(criteria: ThreadSelectionCriteria) { +function messageSelectionCriteriaToSQLClause( + criteria: MessageSelectionCriteria, +) { const conditions = []; if (criteria.joinedThreads === true) { conditions.push(SQL`mm.role > 0`); } if (criteria.threadCursors) { for (const threadID in criteria.threadCursors) { const cursor = criteria.threadCursors[threadID]; if (cursor) { conditions.push(SQL`(m.thread = ${threadID} AND m.id < ${cursor})`); } else { conditions.push(SQL`m.thread = ${threadID}`); } } } if (conditions.length === 0) { throw new ServerError('internal_error'); } return mergeOrConditions(conditions); } -function threadSelectionCriteriaToInitialTruncationStatuses( - criteria: ThreadSelectionCriteria, +function messageSelectionCriteriaToInitialTruncationStatuses( + criteria: MessageSelectionCriteria, defaultTruncationStatus: MessageTruncationStatus, ) { const truncationStatuses = {}; if (criteria.threadCursors) { for (const threadID in criteria.threadCursors) { truncationStatuses[threadID] = defaultTruncationStatus; } } return truncationStatuses; } async function fetchMessageInfosSince( viewer: Viewer, - criteria: ThreadSelectionCriteria, + criteria: MessageSelectionCriteria, currentAsOf: number, maxNumberPerThread: number, ): Promise { - const threadSelectionClause = threadSelectionCriteriaToSQLClause(criteria); - const truncationStatuses = threadSelectionCriteriaToInitialTruncationStatuses( + const selectionClause = messageSelectionCriteriaToSQLClause(criteria); + const truncationStatuses = messageSelectionCriteriaToInitialTruncationStatuses( criteria, messageTruncationStatus.UNCHANGED, ); const viewerID = viewer.id; const query = SQL` SELECT m.id, m.thread AS threadID, m.content, m.time, m.type, m.creation, m.user AS creatorID, stm.permissions AS subthread_permissions, up.id AS uploadID, up.type AS uploadType, up.secret AS uploadSecret, up.extra AS uploadExtra FROM messages m LEFT JOIN uploads up ON m.type IN (${[messageTypes.IMAGES, messageTypes.MULTIMEDIA]}) AND JSON_CONTAINS(m.content, CAST(up.id as JSON), '$') LEFT JOIN memberships mm ON mm.thread = m.thread AND mm.user = ${viewerID} LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content AND stm.user = ${viewerID} WHERE m.time > ${currentAsOf} AND JSON_EXTRACT(mm.permissions, ${visibleExtractString}) IS TRUE AND `; - query.append(threadSelectionClause); + query.append(selectionClause); query.append(SQL` ORDER BY m.thread, m.time DESC `); const [result] = await dbQuery(query); const derivedMessages = await fetchDerivedMessages(result, viewer); const messages = parseMessageSQLResult(result, derivedMessages, viewer); const rawMessageInfos = []; let currentThreadID = null; let numMessagesForCurrentThreadID = 0; for (const message of messages) { const { rawMessageInfo } = message; const { threadID } = rawMessageInfo; if (threadID !== currentThreadID) { currentThreadID = threadID; numMessagesForCurrentThreadID = 1; truncationStatuses[threadID] = messageTruncationStatus.UNCHANGED; } else { numMessagesForCurrentThreadID++; } if (numMessagesForCurrentThreadID <= maxNumberPerThread) { if (messageSpecs[rawMessageInfo.type].startsThread) { truncationStatuses[threadID] = messageTruncationStatus.EXHAUSTIVE; } rawMessageInfos.push(rawMessageInfo); } else if (numMessagesForCurrentThreadID === maxNumberPerThread + 1) { truncationStatuses[threadID] = messageTruncationStatus.TRUNCATED; } } const shimmedRawMessageInfos = shimUnsupportedRawMessageInfos( rawMessageInfos, viewer.platformDetails, ); return { rawMessageInfos: shimmedRawMessageInfos, truncationStatuses, }; } function getMessageFetchResultFromRedisMessages( viewer: Viewer, rawMessageInfos: $ReadOnlyArray, ): FetchMessageInfosResult { const truncationStatuses = {}; for (const rawMessageInfo of rawMessageInfos) { truncationStatuses[rawMessageInfo.threadID] = messageTruncationStatus.UNCHANGED; } const shimmedRawMessageInfos = shimUnsupportedRawMessageInfos( rawMessageInfos, viewer.platformDetails, ); return { rawMessageInfos: shimmedRawMessageInfos, truncationStatuses, }; } async function fetchMessageInfoForLocalID( viewer: Viewer, localID: ?string, ): Promise { if (!localID || !viewer.hasSessionInfo) { return null; } const creation = creationString(viewer, localID); const viewerID = viewer.id; const query = SQL` SELECT m.id, m.thread AS threadID, m.content, m.time, m.type, m.creation, m.user AS creatorID, stm.permissions AS subthread_permissions, up.id AS uploadID, up.type AS uploadType, up.secret AS uploadSecret, up.extra AS uploadExtra FROM messages m LEFT JOIN uploads up ON m.type IN (${[messageTypes.IMAGES, messageTypes.MULTIMEDIA]}) AND JSON_CONTAINS(m.content, CAST(up.id as JSON), '$') LEFT JOIN memberships mm ON mm.thread = m.thread AND mm.user = ${viewerID} LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content AND stm.user = ${viewerID} WHERE m.user = ${viewerID} AND m.creation = ${creation} AND JSON_EXTRACT(mm.permissions, ${visibleExtractString}) IS TRUE `; const [result] = await dbQuery(query); if (result.length === 0) { return null; } const derivedMessages = await fetchDerivedMessages(result, viewer); return rawMessageInfoFromRows(result, viewer, derivedMessages); } const entryIDExtractString = '$.entryID'; async function fetchMessageInfoForEntryAction( viewer: Viewer, messageType: MessageType, entryID: string, threadID: string, ): Promise { const viewerID = viewer.id; const query = SQL` SELECT m.id, m.thread AS threadID, m.content, m.time, m.type, m.creation, m.user AS creatorID, up.id AS uploadID, up.type AS uploadType, up.secret AS uploadSecret, up.extra AS uploadExtra FROM messages m LEFT JOIN uploads up ON m.type IN (${[messageTypes.IMAGES, messageTypes.MULTIMEDIA]}) AND JSON_CONTAINS(m.content, CAST(up.id as JSON), '$') LEFT JOIN memberships mm ON mm.thread = m.thread AND mm.user = ${viewerID} WHERE m.user = ${viewerID} AND m.thread = ${threadID} AND m.type = ${messageType} AND JSON_EXTRACT(m.content, ${entryIDExtractString}) = ${entryID} AND JSON_EXTRACT(mm.permissions, ${visibleExtractString}) IS TRUE `; const [result] = await dbQuery(query); if (result.length === 0) { return null; } const derivedMessages = await fetchDerivedMessages(result, viewer); return rawMessageInfoFromRows(result, viewer, derivedMessages); } async function fetchMessageRowsByIDs(messageIDs: $ReadOnlyArray) { const query = SQL` SELECT m.id, m.thread AS threadID, m.content, m.time, m.type, m.creation, m.user AS creatorID, stm.permissions AS subthread_permissions, up.id AS uploadID, up.type AS uploadType, up.secret AS uploadSecret, up.extra AS uploadExtra FROM messages m LEFT JOIN uploads up ON m.type IN (${[messageTypes.IMAGES, messageTypes.MULTIMEDIA]}) AND JSON_CONTAINS(m.content, CAST(up.id as JSON), '$') LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content AND stm.user = m.user WHERE m.id IN (${messageIDs}) `; const [result] = await dbQuery(query); return result; } async function fetchDerivedMessages( rows: $ReadOnlyArray, viewer?: Viewer, ): Promise< $ReadOnlyMap, > { const requiredIDs = new Set(); for (const row of rows) { if (row.type === messageTypes.SIDEBAR_SOURCE) { const content = JSON.parse(row.content); requiredIDs.add(content.sourceMessageID); } } const messagesByID = new Map< string, RawComposableMessageInfo | RawRobotextMessageInfo, >(); if (requiredIDs.size === 0) { return messagesByID; } const result = await fetchMessageRowsByIDs([...requiredIDs]); const messages = parseMessageSQLResult(result, new Map(), viewer); for (const message of messages) { const { rawMessageInfo } = message; if (rawMessageInfo.id) { invariant( rawMessageInfo.type !== messageTypes.SIDEBAR_SOURCE, 'SIDEBAR_SOURCE should not point to a SIDEBAR_SOURCE', ); messagesByID.set(rawMessageInfo.id, rawMessageInfo); } } return messagesByID; } async function fetchMessageInfoByID( viewer?: Viewer, messageID: string, ): Promise { const result = await fetchMessageRowsByIDs([messageID]); if (result.length === 0) { return null; } const derivedMessages = await fetchDerivedMessages(result, viewer); return rawMessageInfoFromRows(result, viewer, derivedMessages); } export { fetchCollapsableNotifs, fetchMessageInfos, fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, fetchMessageInfoForLocalID, fetchMessageInfoForEntryAction, fetchMessageInfoByID, }; diff --git a/server/src/responders/user-responders.js b/server/src/responders/user-responders.js index 1f3e0049e..c7daca773 100644 --- a/server/src/responders/user-responders.js +++ b/server/src/responders/user-responders.js @@ -1,354 +1,354 @@ // @flow import invariant from 'invariant'; import t from 'tcomb'; import bcrypt from 'twin-bcrypt'; import type { ResetPasswordRequest, LogOutResponse, DeleteAccountRequest, RegisterResponse, RegisterRequest, LogInResponse, LogInRequest, UpdatePasswordRequest, AccessRequest, UpdateUserSettingsRequest, } from 'lib/types/account-types'; import { userSettingsTypes, notificationTypeValues, } from 'lib/types/account-types'; import { defaultNumberPerThread } from 'lib/types/message-types'; import type { SubscriptionUpdateRequest, SubscriptionUpdateResponse, } from 'lib/types/subscription-types'; import type { PasswordUpdate } from 'lib/types/user-types'; import { ServerError } from 'lib/utils/errors'; import { values } from 'lib/utils/objects'; import { promiseAll } from 'lib/utils/promises'; import { tShape, tPlatformDetails, tDeviceType, tPassword, tEmail, tOldValidUsername, } from 'lib/utils/validation-utils'; import createAccount from '../creators/account-creator'; import { dbQuery, SQL } from '../database/database'; import { deleteAccount } from '../deleters/account-deleters'; import { deleteCookie } from '../deleters/cookie-deleters'; import { sendAccessRequestEmailToAshoat } from '../emails/access-request'; import { fetchEntryInfos } from '../fetchers/entry-fetchers'; import { fetchMessageInfos } from '../fetchers/message-fetchers'; import { fetchThreadInfos } from '../fetchers/thread-fetchers'; import { fetchKnownUserInfos, fetchLoggedInUserInfo, } from '../fetchers/user-fetchers'; import { createNewAnonymousCookie, createNewUserCookie, setNewSession, } from '../session/cookies'; import type { Viewer } from '../session/viewer'; import { accountUpdater, checkAndSendVerificationEmail, checkAndSendPasswordResetEmail, updatePassword, updateUserSettings, } from '../updaters/account-updaters'; import { userSubscriptionUpdater } from '../updaters/user-subscription-updaters'; import { validateInput } from '../utils/validation-utils'; import { entryQueryInputValidator, newEntryQueryInputValidator, normalizeCalendarQuery, verifyCalendarQueryThreadIDs, } from './entry-responders'; const subscriptionUpdateRequestInputValidator = tShape({ threadID: t.String, updatedFields: tShape({ pushNotifs: t.maybe(t.Boolean), home: t.maybe(t.Boolean), }), }); async function userSubscriptionUpdateResponder( viewer: Viewer, input: any, ): Promise { const request: SubscriptionUpdateRequest = input; await validateInput(viewer, subscriptionUpdateRequestInputValidator, request); const threadSubscription = await userSubscriptionUpdater(viewer, request); return { threadSubscription }; } const accountUpdateInputValidator = tShape({ updatedFields: tShape({ email: t.maybe(tEmail), password: t.maybe(tPassword), }), currentPassword: tPassword, }); async function passwordUpdateResponder( viewer: Viewer, input: any, ): Promise { const request: PasswordUpdate = input; await validateInput(viewer, accountUpdateInputValidator, request); await accountUpdater(viewer, request); } async function sendVerificationEmailResponder(viewer: Viewer): Promise { await validateInput(viewer, null, null); await checkAndSendVerificationEmail(viewer); } const resetPasswordRequestInputValidator = tShape({ usernameOrEmail: t.union([tEmail, tOldValidUsername]), }); async function sendPasswordResetEmailResponder( viewer: Viewer, input: any, ): Promise { const request: ResetPasswordRequest = input; await validateInput(viewer, resetPasswordRequestInputValidator, request); await checkAndSendPasswordResetEmail(request); } async function logOutResponder(viewer: Viewer): Promise { await validateInput(viewer, null, null); if (viewer.loggedIn) { const [anonymousViewerData] = await Promise.all([ createNewAnonymousCookie({ platformDetails: viewer.platformDetails, deviceToken: viewer.deviceToken, }), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(anonymousViewerData); } return { currentUserInfo: { id: viewer.id, anonymous: true, }, }; } const deleteAccountRequestInputValidator = tShape({ password: tPassword, }); async function accountDeletionResponder( viewer: Viewer, input: any, ): Promise { const request: DeleteAccountRequest = input; await validateInput(viewer, deleteAccountRequestInputValidator, request); const result = await deleteAccount(viewer, request); invariant(result, 'deleteAccount should return result if handed request'); return result; } const deviceTokenUpdateRequestInputValidator = tShape({ deviceType: t.maybe(t.enums.of(['ios', 'android'])), deviceToken: t.String, }); const registerRequestInputValidator = tShape({ username: t.String, email: t.maybe(tEmail), password: tPassword, calendarQuery: t.maybe(newEntryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, }); async function accountCreationResponder( viewer: Viewer, input: any, ): Promise { const request: RegisterRequest = input; await validateInput(viewer, registerRequestInputValidator, request); return await createAccount(viewer, request); } const logInRequestInputValidator = tShape({ username: t.maybe(t.String), usernameOrEmail: t.maybe(t.union([tEmail, tOldValidUsername])), password: tPassword, watchedIDs: t.list(t.String), calendarQuery: t.maybe(entryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, }); async function logInResponder( viewer: Viewer, input: any, ): Promise { await validateInput(viewer, logInRequestInputValidator, input); const request: LogInRequest = input; const calendarQuery = request.calendarQuery ? normalizeCalendarQuery(request.calendarQuery) : null; const promises = {}; if (calendarQuery) { promises.verifyCalendarQueryThreadIDs = verifyCalendarQueryThreadIDs( calendarQuery, ); } const username = request.username ?? request.usernameOrEmail; if (!username) { throw new ServerError('invalid_parameters'); } const userQuery = SQL` SELECT id, hash, username FROM users WHERE LCASE(username) = LCASE(${username}) `; promises.userQuery = dbQuery(userQuery); const { userQuery: [userResult], } = await promiseAll(promises); if (userResult.length === 0) { throw new ServerError('invalid_parameters'); } const userRow = userResult[0]; if (!userRow.hash || !bcrypt.compareSync(request.password, userRow.hash)) { throw new ServerError('invalid_credentials'); } const id = userRow.id.toString(); const newServerTime = Date.now(); const deviceToken = request.deviceTokenUpdateRequest ? request.deviceTokenUpdateRequest.deviceToken : viewer.deviceToken; const [userViewerData] = await Promise.all([ createNewUserCookie(id, { platformDetails: request.platformDetails, deviceToken, }), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(userViewerData); if (calendarQuery) { await setNewSession(viewer, calendarQuery, newServerTime); } const threadCursors = {}; for (const watchedThreadID of request.watchedIDs) { threadCursors[watchedThreadID] = null; } - const threadSelectionCriteria = { threadCursors, joinedThreads: true }; + const messageSelectionCriteria = { threadCursors, joinedThreads: true }; const [ threadsResult, messagesResult, entriesResult, userInfos, currentUserInfo, ] = await Promise.all([ fetchThreadInfos(viewer), - fetchMessageInfos(viewer, threadSelectionCriteria, defaultNumberPerThread), + fetchMessageInfos(viewer, messageSelectionCriteria, defaultNumberPerThread), calendarQuery ? fetchEntryInfos(viewer, [calendarQuery]) : undefined, fetchKnownUserInfos(viewer), fetchLoggedInUserInfo(viewer), ]); const rawEntryInfos = entriesResult ? entriesResult.rawEntryInfos : null; const response: LogInResponse = { currentUserInfo, rawMessageInfos: messagesResult.rawMessageInfos, truncationStatuses: messagesResult.truncationStatuses, serverTime: newServerTime, userInfos: values(userInfos), cookieChange: { threadInfos: threadsResult.threadInfos, userInfos: [], }, }; if (rawEntryInfos) { response.rawEntryInfos = rawEntryInfos; } return response; } const updatePasswordRequestInputValidator = tShape({ code: t.String, password: tPassword, watchedIDs: t.list(t.String), calendarQuery: t.maybe(entryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, }); async function oldPasswordUpdateResponder( viewer: Viewer, input: any, ): Promise { await validateInput(viewer, updatePasswordRequestInputValidator, input); const request: UpdatePasswordRequest = input; if (request.calendarQuery) { request.calendarQuery = normalizeCalendarQuery(request.calendarQuery); } return await updatePassword(viewer, request); } const accessRequestInputValidator = tShape({ email: tEmail, platform: tDeviceType, }); async function requestAccessResponder( viewer: Viewer, input: any, ): Promise { const request: AccessRequest = input; await validateInput(viewer, accessRequestInputValidator, request); await sendAccessRequestEmailToAshoat(request); } const updateUserSettingsInputValidator = tShape({ name: t.irreducible( userSettingsTypes.DEFAULT_NOTIFICATIONS, x => x === userSettingsTypes.DEFAULT_NOTIFICATIONS, ), data: t.enums.of(notificationTypeValues), }); async function updateUserSettingsResponder( viewer: Viewer, input: any, ): Promise { const request: UpdateUserSettingsRequest = input; await validateInput(viewer, updateUserSettingsInputValidator, request); return await updateUserSettings(viewer, request); } export { userSubscriptionUpdateResponder, passwordUpdateResponder, sendVerificationEmailResponder, sendPasswordResetEmailResponder, logOutResponder, accountDeletionResponder, accountCreationResponder, logInResponder, oldPasswordUpdateResponder, requestAccessResponder, updateUserSettingsResponder, }; diff --git a/server/src/responders/website-responders.js b/server/src/responders/website-responders.js index b5ddcfe92..62855ea40 100644 --- a/server/src/responders/website-responders.js +++ b/server/src/responders/website-responders.js @@ -1,336 +1,336 @@ // @flow import html from 'common-tags/lib/html'; import type { $Response, $Request } from 'express'; import fs from 'fs'; import _keyBy from 'lodash/fp/keyBy'; import * as React from 'react'; import ReactDOMServer from 'react-dom/server'; import { Provider } from 'react-redux'; import { Route, StaticRouter } from 'react-router'; import { createStore, type Store } from 'redux'; import { promisify } from 'util'; import { daysToEntriesFromEntryInfos } from 'lib/reducers/entry-reducer'; import { freshMessageStore } from 'lib/reducers/message-reducer'; import { mostRecentReadThread } from 'lib/selectors/thread-selectors'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils'; import { threadHasPermission } from 'lib/shared/thread-utils'; import { defaultWebEnabledApps } from 'lib/types/enabled-apps'; import { defaultCalendarFilters } from 'lib/types/filter-types'; import { defaultNumberPerThread } from 'lib/types/message-types'; import { defaultEnabledReports } from 'lib/types/report-types'; import { defaultConnectionInfo } from 'lib/types/socket-types'; import { threadPermissions } from 'lib/types/thread-types'; import type { CurrentUserInfo } from 'lib/types/user-types'; import { currentDateInTimeZone } from 'lib/utils/date-utils'; import { ServerError } from 'lib/utils/errors'; import { promiseAll } from 'lib/utils/promises'; import { reducer } from 'web/redux/redux-setup'; import type { AppState, Action } from 'web/redux/redux-setup'; import getTitle from 'web/title/getTitle'; import { navInfoFromURL } from 'web/url-utils'; import { fetchEntryInfos } from '../fetchers/entry-fetchers'; import { fetchMessageInfos } from '../fetchers/message-fetchers'; import { fetchThreadInfos } from '../fetchers/thread-fetchers'; import { fetchCurrentUserInfo, fetchKnownUserInfos, } from '../fetchers/user-fetchers'; import { setNewSession } from '../session/cookies'; import { Viewer } from '../session/viewer'; import { streamJSON, waitForStream } from '../utils/json-stream'; import { getAppURLFacts } from '../utils/urls'; const { basePath, baseDomain } = getAppURLFacts(); const { renderToNodeStream } = ReactDOMServer; const baseURL = basePath.replace(/\/$/, ''); const baseHref = baseDomain + baseURL; const access = promisify(fs.access); const googleFontsURL = 'https://fonts.googleapis.com/css2?family=IBM+Plex+Sans:wght@400;500;600&family=Inter:wght@400;500;600&display=swap'; const localFontsURL = 'fonts/local-fonts.css'; async function getFontsURL() { try { await access(localFontsURL); return localFontsURL; } catch { return googleFontsURL; } } type AssetInfo = { jsURL: string, fontsURL: string, cssInclude: string }; let assetInfo: ?AssetInfo = null; async function getAssetInfo() { if (assetInfo) { return assetInfo; } if (process.env.NODE_ENV === 'development') { const fontsURL = await getFontsURL(); assetInfo = { jsURL: 'http://localhost:8080/dev.build.js', fontsURL, cssInclude: '', }; return assetInfo; } // $FlowFixMe web/dist doesn't always exist const { default: assets } = await import('web/dist/assets'); assetInfo = { jsURL: `compiled/${assets.browser.js}`, fontsURL: googleFontsURL, cssInclude: html` `, }; return assetInfo; } let webpackCompiledRootComponent: ?React.ComponentType<{}> = null; async function getWebpackCompiledRootComponentForSSR() { if (webpackCompiledRootComponent) { return webpackCompiledRootComponent; } try { // $FlowFixMe web/dist doesn't always exist const webpackBuild = await import('web/dist/app.build.cjs'); webpackCompiledRootComponent = webpackBuild.default.default; return webpackCompiledRootComponent; } catch { throw new Error( 'Could not load app.build.cjs. ' + 'Did you forget to run `yarn dev` in the web folder?', ); } } async function websiteResponder( viewer: Viewer, req: $Request, res: $Response, ): Promise { const appPromise = getWebpackCompiledRootComponentForSSR(); let initialNavInfo; try { initialNavInfo = navInfoFromURL(req.url, { now: currentDateInTimeZone(viewer.timeZone), }); } catch (e) { throw new ServerError(e.message); } const calendarQuery = { startDate: initialNavInfo.startDate, endDate: initialNavInfo.endDate, filters: defaultCalendarFilters, }; - const threadSelectionCriteria = { joinedThreads: true }; + const messageSelectionCriteria = { joinedThreads: true }; const initialTime = Date.now(); const assetInfoPromise = getAssetInfo(); const threadInfoPromise = fetchThreadInfos(viewer); const messageInfoPromise = fetchMessageInfos( viewer, - threadSelectionCriteria, + messageSelectionCriteria, defaultNumberPerThread, ); const entryInfoPromise = fetchEntryInfos(viewer, [calendarQuery]); const currentUserInfoPromise = fetchCurrentUserInfo(viewer); const userInfoPromise = fetchKnownUserInfos(viewer); const sessionIDPromise = (async () => { if (viewer.loggedIn) { await setNewSession(viewer, calendarQuery, initialTime); } return viewer.sessionID; })(); const threadStorePromise = (async () => { const { threadInfos } = await threadInfoPromise; return { threadInfos }; })(); const messageStorePromise = (async () => { const [ { threadInfos }, { rawMessageInfos, truncationStatuses }, ] = await Promise.all([threadInfoPromise, messageInfoPromise]); const { messageStore: freshStore } = freshMessageStore( rawMessageInfos, truncationStatuses, mostRecentMessageTimestamp(rawMessageInfos, initialTime), threadInfos, ); return freshStore; })(); const entryStorePromise = (async () => { const { rawEntryInfos } = await entryInfoPromise; return { entryInfos: _keyBy('id')(rawEntryInfos), daysToEntries: daysToEntriesFromEntryInfos(rawEntryInfos), lastUserInteractionCalendar: initialTime, }; })(); const userStorePromise = (async () => { const userInfos = await userInfoPromise; return { userInfos, inconsistencyReports: [] }; })(); const navInfoPromise = (async () => { const [{ threadInfos }, messageStore] = await Promise.all([ threadInfoPromise, messageStorePromise, ]); const finalNavInfo = initialNavInfo; const requestedActiveChatThreadID = finalNavInfo.activeChatThreadID; if ( requestedActiveChatThreadID && !threadHasPermission( threadInfos[requestedActiveChatThreadID], threadPermissions.VISIBLE, ) ) { finalNavInfo.activeChatThreadID = null; } if (!finalNavInfo.activeChatThreadID) { const mostRecentThread = mostRecentReadThread(messageStore, threadInfos); if (mostRecentThread) { finalNavInfo.activeChatThreadID = mostRecentThread; } } return finalNavInfo; })(); const { jsURL, fontsURL, cssInclude } = await assetInfoPromise; // prettier-ignore res.write(html` ${getTitle(0)} ${cssInclude}
`); const statePromises = { navInfo: navInfoPromise, currentUserInfo: ((currentUserInfoPromise: any): Promise), sessionID: sessionIDPromise, entryStore: entryStorePromise, threadStore: threadStorePromise, userStore: userStorePromise, messageStore: messageStorePromise, updatesCurrentAsOf: initialTime, loadingStatuses: {}, calendarFilters: defaultCalendarFilters, // We can use paths local to the on web urlPrefix: '', windowDimensions: { width: 0, height: 0 }, baseHref, connection: { ...defaultConnectionInfo('web', viewer.timeZone), actualizedCalendarQuery: calendarQuery, }, watchedThreadIDs: [], lifecycleState: 'active', enabledApps: defaultWebEnabledApps, reportStore: { enabledReports: defaultEnabledReports, queuedReports: [], }, nextLocalID: 0, timeZone: viewer.timeZone, userAgent: viewer.userAgent, cookie: undefined, deviceToken: undefined, dataLoaded: viewer.loggedIn, windowActive: true, }; const [stateResult, App] = await Promise.all([ promiseAll(statePromises), appPromise, ]); const state: AppState = { ...stateResult }; const store: Store = createStore(reducer, state); const routerContext = {}; const reactStream = renderToNodeStream( , ); if (routerContext.url) { throw new ServerError('URL modified during server render!'); } reactStream.pipe(res, { end: false }); await waitForStream(reactStream); res.write(html`
`); } export { websiteResponder }; diff --git a/server/src/socket/socket.js b/server/src/socket/socket.js index 14280f02f..9aab1a7dc 100644 --- a/server/src/socket/socket.js +++ b/server/src/socket/socket.js @@ -1,810 +1,810 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce'; import t from 'tcomb'; import WebSocket from 'ws'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils'; import type { Shape } from 'lib/types/core'; import { endpointIsSocketSafe } from 'lib/types/endpoints'; import { defaultNumberPerThread } from 'lib/types/message-types'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types'; import { cookieSources, sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, } from 'lib/types/socket-types'; import { ServerError } from 'lib/utils/errors'; import { values } from 'lib/utils/objects'; import { promiseAll } from 'lib/utils/promises'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver'; import sleep from 'lib/utils/sleep'; import { tShape, tCookie } from 'lib/utils/validation-utils'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters'; import { deleteCookie } from '../deleters/cookie-deleters'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters'; import { jsonEndpoints } from '../endpoints'; import { fetchEntryInfos } from '../fetchers/entry-fetchers'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers'; import { fetchThreadInfos } from '../fetchers/thread-fetchers'; import { fetchUpdateInfos } from '../fetchers/update-fetchers'; import { fetchCurrentUserInfo, fetchKnownUserInfos, } from '../fetchers/user-fetchers'; import { newEntryQueryInputValidator, verifyCalendarQueryThreadIDs, } from '../responders/entry-responders'; import { handleAsyncPromise } from '../responders/handlers'; import { fetchViewerForSocket, extendCookieLifespan, createNewAnonymousCookie, } from '../session/cookies'; import { Viewer } from '../session/viewer'; import { commitSessionUpdate } from '../updaters/session-updaters'; import { assertSecureRequest } from '../utils/security-utils'; import { checkInputValidator, checkClientSupported, } from '../utils/validation-utils'; import { RedisSubscriber } from './redis'; import { clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils'; const clientSocketMessageInputValidator = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: newEntryQueryInputValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.Object, }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ) => { invariant(typeof messageString === 'string', 'message should be string'); let clientSocketMessage: ?ClientSocketMessage; try { this.resetTimeout(); const message = JSON.parse(messageString); checkInputValidator(clientSocketMessageInputValidator, message); clientSocketMessage = message; if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessage, ); if (!this.viewer) { // This indicates that the cookie was invalid, but the client is using // cookieSources.HEADER and thus can't accept a new cookie over // WebSockets. See comment under catch block for socket_deauthorized. throw new ServerError('socket_deauthorized'); } } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessage, ); const serverResponses = await this.handleClientSocketMessage( clientSocketMessage, ); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { handleAsyncPromise(extendCookieLifespan(viewer.cookieID)); } for (const response of serverResponses) { this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; const responseTo = clientSocketMessage ? clientSocketMessage.id : null; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); this.sendMessage(errorMessage); return; } invariant(clientSocketMessage, 'should be set'); const responseTo = clientSocketMessage.id; if (error.message === 'socket_deauthorized') { const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (this.viewer) { // viewer should only be falsey for cookieSources.HEADER (web) // clients. Usually if the cookie is invalid we construct a new // anonymous Viewer with a new cookie, and then pass the cookie down // in the error. But we can't pass HTTP cookies in WebSocket messages. authErrorMessage.sessionChange = { cookie: this.viewer.cookiePairString, currentUserInfo: { id: this.viewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const promises = {}; promises.deleteCookie = deleteCookie(viewer.cookieID); if (viewer.cookieSource !== cookieSources.BODY) { promises.anonymousViewerData = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); } const { anonymousViewerData } = await promiseAll(promises); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, }; if (anonymousViewerData) { // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); authErrorMessage.sessionChange = { cookie: anonViewer.cookiePairString, currentUserInfo: { id: anonViewer.cookieID, anonymous: true, }, }; } this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState === 1) { this.ws.send(JSON.stringify(message)); } }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } - const threadSelectionCriteria = { threadCursors, joinedThreads: true }; + const messageSelectionCriteria = { threadCursors, joinedThreads: true }; const [ fetchMessagesResult, { serverRequests, activityUpdateResult }, ] = await Promise.all([ fetchMessageInfosSince( viewer, - threadSelectionCriteria, + messageSelectionCriteria, oldMessagesCurrentAsOf, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; if (!sessionInitializationResult.sessionContinued) { const [ threadsResult, entriesResult, currentUserInfo, knownUserInfos, ] = await Promise.all([ fetchThreadInfos(viewer), fetchEntryInfos(viewer, [calendarQuery]), fetchCurrentUserInfo(viewer), fetchKnownUserInfos(viewer), ]); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: threadsResult.threadInfos, currentUserInfo, rawEntryInfos: entriesResult.rawEntryInfos, userInfos: values(knownUserInfos), updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters sessionIdentifierTypes.BODY_SESSION_ID, // but the session is unspecified or expired, it will set a new sessionID // and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult, } = sessionInitializationResult; const promises = {}; promises.deleteExpiredUpdates = deleteUpdatesBeforeTimeTargetingSession( viewer, oldUpdatesCurrentAsOf, ); promises.fetchUpdateResult = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); promises.sessionUpdate = commitSessionUpdate(viewer, sessionUpdate); const { fetchUpdateResult } = await promiseAll(promises); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, viewer.calendarQuery, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; const response = await responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos, } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Shape) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart() { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { this.initiateStateCheck(); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState( viewer, { status: 'state_check' }, viewer.calendarQuery, ); invariant(checkStateRequest, 'should be set'); this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection }; diff --git a/server/src/updaters/thread-updaters.js b/server/src/updaters/thread-updaters.js index 2c694aefa..f1c47e290 100644 --- a/server/src/updaters/thread-updaters.js +++ b/server/src/updaters/thread-updaters.js @@ -1,841 +1,841 @@ // @flow import { filteredThreadIDs } from 'lib/selectors/calendar-filter-selectors'; import { threadHasAdminRole, roleIsAdminRole, viewerIsMember, getThreadTypeParentRequirement, } from 'lib/shared/thread-utils'; import { hasMinCodeVersion } from 'lib/shared/version-utils'; import type { Shape } from 'lib/types/core'; import { messageTypes, defaultNumberPerThread } from 'lib/types/message-types'; import { type RoleChangeRequest, type ChangeThreadSettingsResult, type RemoveMembersRequest, type LeaveThreadRequest, type LeaveThreadResult, type UpdateThreadRequest, type ServerThreadJoinRequest, type ThreadJoinResult, threadPermissions, threadTypes, } from 'lib/types/thread-types'; import { updateTypes } from 'lib/types/update-types'; import { ServerError } from 'lib/utils/errors'; import { promiseAll } from 'lib/utils/promises'; import { firstLine } from 'lib/utils/string-utils'; import createMessages from '../creators/message-creator'; import { getRolePermissionBlobs } from '../creators/role-creator'; import { createUpdates } from '../creators/update-creator'; import { dbQuery, SQL } from '../database/database'; import { fetchEntryInfos } from '../fetchers/entry-fetchers'; import { fetchMessageInfos } from '../fetchers/message-fetchers'; import { fetchThreadInfos, fetchServerThreadInfos, determineThreadAncestry, } from '../fetchers/thread-fetchers'; import { checkThreadPermission, viewerIsMember as fetchViewerIsMember, checkThread, validateCandidateMembers, } from '../fetchers/thread-permission-fetchers'; import { verifyUserIDs, verifyUserOrCookieIDs, } from '../fetchers/user-fetchers'; import type { Viewer } from '../session/viewer'; import RelationshipChangeset from '../utils/relationship-changeset'; import { updateRoles } from './role-updaters'; import { changeRole, recalculateThreadPermissions, commitMembershipChangeset, } from './thread-permission-updaters'; async function updateRole( viewer: Viewer, request: RoleChangeRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [memberIDs, hasPermission] = await Promise.all([ verifyUserIDs(request.memberIDs), checkThreadPermission( viewer, request.threadID, threadPermissions.CHANGE_ROLE, ), ]); if (memberIDs.length === 0) { throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_credentials'); } const query = SQL` SELECT user, role FROM memberships WHERE user IN (${memberIDs}) AND thread = ${request.threadID} `; const [result] = await dbQuery(query); let nonMemberUser = false; let numResults = 0; for (const row of result) { if (row.role <= 0) { nonMemberUser = true; break; } numResults++; } if (nonMemberUser || numResults < memberIDs.length) { throw new ServerError('invalid_parameters'); } const changeset = await changeRole(request.threadID, memberIDs, request.role); const { threadInfos, viewerUpdates } = await commitMembershipChangeset( viewer, changeset, ); const messageData = { type: messageTypes.CHANGE_ROLE, threadID: request.threadID, creatorID: viewer.userID, time: Date.now(), userIDs: memberIDs, newRole: request.role, }; const newMessageInfos = await createMessages(viewer, [messageData]); if (hasMinCodeVersion(viewer.platformDetails, 62)) { return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } return { threadInfo: threadInfos[request.threadID], threadInfos, updatesResult: { newUpdates: viewerUpdates, }, newMessageInfos, }; } async function removeMembers( viewer: Viewer, request: RemoveMembersRequest, ): Promise { const viewerID = viewer.userID; if (request.memberIDs.includes(viewerID)) { throw new ServerError('invalid_parameters'); } const [memberIDs, hasPermission] = await Promise.all([ verifyUserOrCookieIDs(request.memberIDs), checkThreadPermission( viewer, request.threadID, threadPermissions.REMOVE_MEMBERS, ), ]); if (memberIDs.length === 0) { throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_credentials'); } const query = SQL` SELECT m.user, m.role, t.default_role FROM memberships m LEFT JOIN threads t ON t.id = m.thread WHERE m.user IN (${memberIDs}) AND m.thread = ${request.threadID} `; const [result] = await dbQuery(query); let nonDefaultRoleUser = false; const actualMemberIDs = []; for (const row of result) { if (row.role <= 0) { continue; } actualMemberIDs.push(row.user.toString()); if (row.role !== row.default_role) { nonDefaultRoleUser = true; } } if (nonDefaultRoleUser) { const hasChangeRolePermission = await checkThreadPermission( viewer, request.threadID, threadPermissions.CHANGE_ROLE, ); if (!hasChangeRolePermission) { throw new ServerError('invalid_credentials'); } } const changeset = await changeRole(request.threadID, actualMemberIDs, 0); const { threadInfos, viewerUpdates } = await commitMembershipChangeset( viewer, changeset, ); const newMessageInfos = await (async () => { if (actualMemberIDs.length === 0) { return []; } const messageData = { type: messageTypes.REMOVE_MEMBERS, threadID: request.threadID, creatorID: viewerID, time: Date.now(), removedUserIDs: actualMemberIDs, }; return await createMessages(viewer, [messageData]); })(); if (hasMinCodeVersion(viewer.platformDetails, 62)) { return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } return { threadInfo: threadInfos[request.threadID], threadInfos, updatesResult: { newUpdates: viewerUpdates, }, newMessageInfos, }; } async function leaveThread( viewer: Viewer, request: LeaveThreadRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [fetchThreadResult, hasPermission] = await Promise.all([ fetchThreadInfos(viewer, SQL`t.id = ${request.threadID}`), checkThreadPermission( viewer, request.threadID, threadPermissions.LEAVE_THREAD, ), ]); const threadInfo = fetchThreadResult.threadInfos[request.threadID]; if (!viewerIsMember(threadInfo)) { if (hasMinCodeVersion(viewer.platformDetails, 62)) { return { updatesResult: { newUpdates: [] }, }; } const { threadInfos } = await fetchThreadInfos(viewer); return { threadInfos, updatesResult: { newUpdates: [], }, }; } if (!hasPermission) { throw new ServerError('invalid_parameters'); } const viewerID = viewer.userID; if (threadHasAdminRole(threadInfo)) { let otherUsersExist = false; let otherAdminsExist = false; for (const member of threadInfo.members) { const role = member.role; if (!role || member.id === viewerID) { continue; } otherUsersExist = true; if (roleIsAdminRole(threadInfo.roles[role])) { otherAdminsExist = true; break; } } if (otherUsersExist && !otherAdminsExist) { throw new ServerError('invalid_parameters'); } } const changeset = await changeRole(request.threadID, [viewerID], 0); const { threadInfos, viewerUpdates } = await commitMembershipChangeset( viewer, changeset, ); const messageData = { type: messageTypes.LEAVE_THREAD, threadID: request.threadID, creatorID: viewerID, time: Date.now(), }; await createMessages(viewer, [messageData]); if (hasMinCodeVersion(viewer.platformDetails, 62)) { return { updatesResult: { newUpdates: viewerUpdates } }; } return { threadInfos, updatesResult: { newUpdates: viewerUpdates, }, }; } type UpdateThreadOptions = Shape<{ +forceAddMembers: boolean, +forceUpdateRoot: boolean, +silenceMessages: boolean, +ignorePermissions: boolean, }>; async function updateThread( viewer: Viewer, request: UpdateThreadRequest, options?: UpdateThreadOptions, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const forceAddMembers = options?.forceAddMembers ?? false; const forceUpdateRoot = options?.forceUpdateRoot ?? false; const silenceMessages = options?.silenceMessages ?? false; const ignorePermissions = (options?.ignorePermissions && viewer.isScriptViewer) ?? false; const validationPromises = {}; const changedFields = {}; const sqlUpdate = {}; const untrimmedName = request.changes.name; if (untrimmedName !== undefined && untrimmedName !== null) { const name = firstLine(untrimmedName); changedFields.name = name; sqlUpdate.name = name ?? null; } const { description } = request.changes; if (description !== undefined && description !== null) { changedFields.description = description; sqlUpdate.description = description ?? null; } if (request.changes.color) { const color = request.changes.color.toLowerCase(); changedFields.color = color; sqlUpdate.color = color; } const { parentThreadID } = request.changes; if (parentThreadID !== undefined) { // TODO some sort of message when this changes sqlUpdate.parent_thread_id = parentThreadID; } const threadType = request.changes.type; if (threadType !== null && threadType !== undefined) { changedFields.type = threadType; sqlUpdate.type = threadType; } if ( !ignorePermissions && threadType !== null && threadType !== undefined && threadType !== threadTypes.COMMUNITY_OPEN_SUBTHREAD && threadType !== threadTypes.COMMUNITY_SECRET_SUBTHREAD ) { throw new ServerError('invalid_parameters'); } const newMemberIDs = request.changes.newMemberIDs && request.changes.newMemberIDs.length > 0 ? [...new Set(request.changes.newMemberIDs)] : null; if ( Object.keys(sqlUpdate).length === 0 && !newMemberIDs && !forceUpdateRoot ) { throw new ServerError('invalid_parameters'); } validationPromises.serverThreadInfos = fetchServerThreadInfos( SQL`t.id = ${request.threadID}`, ); validationPromises.hasNecessaryPermissions = (async () => { if (ignorePermissions) { return; } const checks = []; if (sqlUpdate.name !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_NAME, }); } if (sqlUpdate.description !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_DESCRIPTION, }); } if (sqlUpdate.color !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_COLOR, }); } if (parentThreadID !== undefined || sqlUpdate.type !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_PERMISSIONS, }); } if (newMemberIDs) { checks.push({ check: 'permission', permission: threadPermissions.ADD_MEMBERS, }); } const hasNecessaryPermissions = await checkThread( viewer, request.threadID, checks, ); if (!hasNecessaryPermissions) { throw new ServerError('invalid_credentials'); } })(); const { serverThreadInfos } = await promiseAll(validationPromises); const serverThreadInfo = serverThreadInfos.threadInfos[request.threadID]; if (!serverThreadInfo) { throw new ServerError('internal_error'); } // Threads with source message should be visible to everyone, but we can't // guarantee it for COMMUNITY_SECRET_SUBTHREAD threads so we forbid it for // now. In the future, if we want to support this, we would need to unlink the // source message. if ( threadType !== null && threadType !== undefined && threadType !== threadTypes.SIDEBAR && threadType !== threadTypes.COMMUNITY_OPEN_SUBTHREAD && serverThreadInfo.sourceMessageID ) { throw new ServerError('invalid_parameters'); } // You can't change the parent thread of a current or former SIDEBAR if (parentThreadID !== undefined && serverThreadInfo.sourceMessageID) { throw new ServerError('invalid_parameters'); } const oldThreadType = serverThreadInfo.type; const oldParentThreadID = serverThreadInfo.parentThreadID; const oldContainingThreadID = serverThreadInfo.containingThreadID; const oldCommunity = serverThreadInfo.community; const oldDepth = serverThreadInfo.depth; const nextThreadType = threadType !== null && threadType !== undefined ? threadType : oldThreadType; let nextParentThreadID = parentThreadID !== undefined ? parentThreadID : oldParentThreadID; // Does the new thread type preclude a parent? if ( threadType !== undefined && threadType !== null && getThreadTypeParentRequirement(threadType) === 'disabled' && nextParentThreadID !== null ) { nextParentThreadID = null; sqlUpdate.parent_thread_id = null; } // Does the new thread type require a parent? if ( threadType !== undefined && threadType !== null && getThreadTypeParentRequirement(threadType) === 'required' && nextParentThreadID === null ) { throw new ServerError('no_parent_thread_specified'); } const determineThreadAncestryPromise = determineThreadAncestry( nextParentThreadID, nextThreadType, ); const confirmParentPermissionPromise = (async () => { if (ignorePermissions || !nextParentThreadID) { return; } if ( nextParentThreadID === oldParentThreadID && (nextThreadType === threadTypes.SIDEBAR) === (oldThreadType === threadTypes.SIDEBAR) ) { return; } const hasParentPermission = await checkThreadPermission( viewer, nextParentThreadID, nextThreadType === threadTypes.SIDEBAR ? threadPermissions.CREATE_SIDEBARS : threadPermissions.CREATE_SUBCHANNELS, ); if (!hasParentPermission) { throw new ServerError('invalid_parameters'); } })(); const rolesNeedUpdate = forceUpdateRoot || nextThreadType !== oldThreadType; const validateNewMembersPromise = (async () => { if (!newMemberIDs || ignorePermissions) { return; } const defaultRolePermissionsPromise = (async () => { let rolePermissions; if (!rolesNeedUpdate) { const rolePermissionsQuery = SQL` SELECT r.permissions FROM threads t LEFT JOIN roles r ON r.id = t.default_role WHERE t.id = ${request.threadID} `; const [result] = await dbQuery(rolePermissionsQuery); if (result.length > 0) { rolePermissions = result[0].permissions; } } if (!rolePermissions) { rolePermissions = getRolePermissionBlobs(nextThreadType).Members; } return rolePermissions; })(); const [defaultRolePermissions, nextThreadAncestry] = await Promise.all([ defaultRolePermissionsPromise, determineThreadAncestryPromise, ]); const { newMemberIDs: validatedIDs } = await validateCandidateMembers( viewer, { newMemberIDs }, { threadType: nextThreadType, parentThreadID: nextParentThreadID, containingThreadID: nextThreadAncestry.containingThreadID, defaultRolePermissions, }, { requireRelationship: !forceAddMembers }, ); if ( validatedIDs && Number(validatedIDs?.length) < Number(newMemberIDs?.length) ) { throw new ServerError('invalid_credentials'); } })(); const { nextThreadAncestry } = await promiseAll({ nextThreadAncestry: determineThreadAncestryPromise, confirmParentPermissionPromise, validateNewMembersPromise, }); if (nextThreadAncestry.containingThreadID !== oldContainingThreadID) { sqlUpdate.containing_thread_id = nextThreadAncestry.containingThreadID; } if (nextThreadAncestry.community !== oldCommunity) { if (!ignorePermissions) { throw new ServerError('invalid_parameters'); } sqlUpdate.community = nextThreadAncestry.community; } if (nextThreadAncestry.depth !== oldDepth) { sqlUpdate.depth = nextThreadAncestry.depth; } const updateQueryPromise = (async () => { if (Object.keys(sqlUpdate).length === 0) { return; } const updateQuery = SQL` UPDATE threads SET ${sqlUpdate} WHERE id = ${request.threadID} `; await dbQuery(updateQuery); })(); const updateRolesPromise = (async () => { if (rolesNeedUpdate) { await updateRoles(viewer, request.threadID, nextThreadType); } })(); const intermediatePromises = {}; intermediatePromises.updateQuery = updateQueryPromise; intermediatePromises.updateRoles = updateRolesPromise; if (newMemberIDs) { intermediatePromises.addMembersChangeset = (async () => { await Promise.all([updateQueryPromise, updateRolesPromise]); return await changeRole(request.threadID, newMemberIDs, null, { setNewMembersToUnread: true, }); })(); } const threadRootChanged = rolesNeedUpdate || nextParentThreadID !== oldParentThreadID; if (threadRootChanged) { intermediatePromises.recalculatePermissionsChangeset = (async () => { await Promise.all([updateQueryPromise, updateRolesPromise]); return await recalculateThreadPermissions(request.threadID); })(); } const { addMembersChangeset, recalculatePermissionsChangeset, } = await promiseAll(intermediatePromises); const membershipRows = []; const relationshipChangeset = new RelationshipChangeset(); if (recalculatePermissionsChangeset) { const { membershipRows: recalculateMembershipRows, relationshipChangeset: recalculateRelationshipChangeset, } = recalculatePermissionsChangeset; membershipRows.push(...recalculateMembershipRows); relationshipChangeset.addAll(recalculateRelationshipChangeset); } let addedMemberIDs; if (addMembersChangeset) { const { membershipRows: addMembersMembershipRows, relationshipChangeset: addMembersRelationshipChangeset, } = addMembersChangeset; addedMemberIDs = addMembersMembershipRows .filter(row => row.operation === 'save' && Number(row.role) > 0) .map(row => row.userID); membershipRows.push(...addMembersMembershipRows); relationshipChangeset.addAll(addMembersRelationshipChangeset); } const changeset = { membershipRows, relationshipChangeset }; const { threadInfos, viewerUpdates } = await commitMembershipChangeset( viewer, changeset, { // This forces an update for this thread, // regardless of whether any membership rows are changed changedThreadIDs: Object.keys(sqlUpdate).length > 0 ? new Set([request.threadID]) : new Set(), }, ); let newMessageInfos = []; if (!silenceMessages) { const time = Date.now(); const messageDatas = []; for (const fieldName in changedFields) { const newValue = changedFields[fieldName]; messageDatas.push({ type: messageTypes.CHANGE_SETTINGS, threadID: request.threadID, creatorID: viewer.userID, time, field: fieldName, value: newValue, }); } if (addedMemberIDs && addedMemberIDs.length > 0) { messageDatas.push({ type: messageTypes.ADD_MEMBERS, threadID: request.threadID, creatorID: viewer.userID, time, addedUserIDs: addedMemberIDs, }); } newMessageInfos = await createMessages(viewer, messageDatas); } if (hasMinCodeVersion(viewer.platformDetails, 62)) { return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } return { threadInfo: threadInfos[request.threadID], threadInfos, updatesResult: { newUpdates: viewerUpdates, }, newMessageInfos, }; } async function joinThread( viewer: Viewer, request: ServerThreadJoinRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [isMember, hasPermission] = await Promise.all([ fetchViewerIsMember(viewer, request.threadID), checkThreadPermission( viewer, request.threadID, threadPermissions.JOIN_THREAD, ), ]); if (!hasPermission) { throw new ServerError('invalid_parameters'); } // TODO: determine code version const hasCodeVersionBelow87 = !hasMinCodeVersion(viewer.platformDetails, 87); const hasCodeVersionBelow62 = !hasMinCodeVersion(viewer.platformDetails, 62); const { calendarQuery } = request; if (isMember) { const response: ThreadJoinResult = { rawMessageInfos: [], truncationStatuses: {}, userInfos: {}, updatesResult: { newUpdates: [], }, }; if (calendarQuery && hasCodeVersionBelow87) { response.rawEntryInfos = []; } if (hasCodeVersionBelow62) { response.threadInfos = {}; } return response; } if (calendarQuery) { const threadFilterIDs = filteredThreadIDs(calendarQuery.filters); if ( !threadFilterIDs || threadFilterIDs.size !== 1 || threadFilterIDs.values().next().value !== request.threadID ) { throw new ServerError('invalid_parameters'); } } const changeset = await changeRole(request.threadID, [viewer.userID], null); const membershipResult = await commitMembershipChangeset(viewer, changeset, { calendarQuery, }); const messageData = { type: messageTypes.JOIN_THREAD, threadID: request.threadID, creatorID: viewer.userID, time: Date.now(), }; const newMessages = await createMessages(viewer, [messageData]); - const threadSelectionCriteria = { + const messageSelectionCriteria = { threadCursors: { [request.threadID]: false }, }; if (!hasCodeVersionBelow87) { return { rawMessageInfos: newMessages, truncationStatuses: {}, userInfos: membershipResult.userInfos, updatesResult: { newUpdates: membershipResult.viewerUpdates, }, }; } const [fetchMessagesResult, fetchEntriesResult] = await Promise.all([ - fetchMessageInfos(viewer, threadSelectionCriteria, defaultNumberPerThread), + fetchMessageInfos(viewer, messageSelectionCriteria, defaultNumberPerThread), calendarQuery ? fetchEntryInfos(viewer, [calendarQuery]) : undefined, ]); const rawEntryInfos = fetchEntriesResult && fetchEntriesResult.rawEntryInfos; const response: ThreadJoinResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, userInfos: membershipResult.userInfos, updatesResult: { newUpdates: membershipResult.viewerUpdates, }, }; if (hasCodeVersionBelow62) { response.threadInfos = membershipResult.threadInfos; } if (rawEntryInfos) { response.rawEntryInfos = rawEntryInfos; } return response; } async function updateThreadMembers(viewer: Viewer) { const { threadInfos } = await fetchThreadInfos( viewer, SQL`t.parent_thread_id IS NOT NULL `, ); const updateDatas = []; const time = Date.now(); for (const threadID in threadInfos) { updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID: viewer.id, time, threadID: threadID, targetSession: viewer.session, }); } await createUpdates(updateDatas); } export { updateRole, removeMembers, leaveThread, updateThread, joinThread, updateThreadMembers, };