diff --git a/keyserver/src/creators/message-report-creator.js b/keyserver/src/creators/message-report-creator.js index 4729d03c3..3436139e5 100644 --- a/keyserver/src/creators/message-report-creator.js +++ b/keyserver/src/creators/message-report-creator.js @@ -1,141 +1,144 @@ // @flow import bots from 'lib/facts/bots.js'; import { createMessageQuote } from 'lib/shared/message-utils.js'; import { type MessageReportCreationRequest } from 'lib/types/message-report-types.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import type { RawMessageInfo } from 'lib/types/message-types.js'; import type { ServerThreadInfo } from 'lib/types/thread-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { promiseAll } from 'lib/utils/promises.js'; import createMessages from './message-creator.js'; import { createCommbotThread } from '../bots/commbot.js'; import { fetchMessageInfoByID } from '../fetchers/message-fetchers.js'; import { fetchPersonalThreadID, serverThreadInfoFromMessageInfo, } from '../fetchers/thread-fetchers.js'; import { fetchUsername, fetchKeyserverAdminID, } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; const { commbot } = bots; type MessageReportData = { +reportedMessageText: ?string, +reporterUsername: ?string, +commbotThreadID: string, +reportedThread: ?ServerThreadInfo, +reportedMessageAuthor: ?string, }; async function createMessageReport( viewer: Viewer, request: MessageReportCreationRequest, ): Promise { const { reportedMessageText, reporterUsername, commbotThreadID, reportedThread, reportedMessageAuthor, } = await fetchMessageReportData(viewer, request); const reportMessage = getCommbotMessage( reporterUsername, reportedMessageAuthor, reportedThread?.name, reportedMessageText, ); const time = Date.now(); const result = await createMessages(viewer, [ { type: messageTypes.TEXT, threadID: commbotThreadID, creatorID: commbot.userID, time, text: reportMessage, }, ]); if (result.length === 0) { throw new ServerError('message_report_failed'); } return result; } async function fetchMessageReportData( viewer: Viewer, request: MessageReportCreationRequest, ): Promise { const keyserverAdminIDPromise = fetchKeyserverAdminID(); const reportedMessagePromise = fetchMessageInfoByID( viewer, request.messageID, ); - const promises = {}; - - promises.viewerUsername = fetchUsername(viewer.id); + const viewerUsernamePromise = fetchUsername(viewer.id); const keyserverAdminID = await keyserverAdminIDPromise; if (!keyserverAdminID) { throw new ServerError('keyserver_admin_not_found'); } - promises.commbotThreadID = getCommbotThreadID(keyserverAdminID); + const commbotThreadIDPromise = getCommbotThreadID(keyserverAdminID); const reportedMessage = await reportedMessagePromise; - if (reportedMessage) { - promises.reportedThread = serverThreadInfoFromMessageInfo(reportedMessage); - } + const reportedThreadPromise: Promise = reportedMessage + ? serverThreadInfoFromMessageInfo(reportedMessage) + : Promise.resolve(undefined); const reportedMessageAuthorID = reportedMessage?.creatorID; - if (reportedMessageAuthorID) { - promises.reportedMessageAuthor = fetchUsername(reportedMessageAuthorID); - } + const reportedMessageAuthorPromise: Promise = reportedMessageAuthorID + ? fetchUsername(reportedMessageAuthorID) + : Promise.resolve(undefined); const reportedMessageText = reportedMessage?.type === 0 ? reportedMessage.text : null; const { viewerUsername, commbotThreadID, reportedThread, reportedMessageAuthor, - } = await promiseAll(promises); + } = await promiseAll({ + viewerUsername: viewerUsernamePromise, + commbotThreadID: commbotThreadIDPromise, + reportedThread: reportedThreadPromise, + reportedMessageAuthor: reportedMessageAuthorPromise, + }); return { reportedMessageText, reporterUsername: viewerUsername, commbotThreadID, reportedThread, reportedMessageAuthor, }; } async function getCommbotThreadID(userID: string): Promise { const commbotThreadID = await fetchPersonalThreadID(userID, commbot.userID); return commbotThreadID ?? createCommbotThread(userID); } function getCommbotMessage( reporterUsername: ?string, messageAuthorUsername: ?string, threadName: ?string, message: ?string, ): string { reporterUsername = reporterUsername ?? '[null]'; const messageAuthor = messageAuthorUsername ? `${messageAuthorUsername}’s` : 'this'; const thread = threadName ? `chat titled "${threadName}"` : 'chat'; const reply = message ? createMessageQuote(message) : 'non-text message'; return ( `${reporterUsername} reported ${messageAuthor} message in ${thread}\n` + reply ); } export default createMessageReport; diff --git a/keyserver/src/creators/thread-creator.js b/keyserver/src/creators/thread-creator.js index a1bc370cd..29f32b561 100644 --- a/keyserver/src/creators/thread-creator.js +++ b/keyserver/src/creators/thread-creator.js @@ -1,510 +1,511 @@ // @flow import invariant from 'invariant'; import bots from 'lib/facts/bots.js'; import genesis from 'lib/facts/genesis.js'; import { getRolePermissionBlobs } from 'lib/permissions/thread-permissions.js'; import { generatePendingThreadColor, generateRandomColor, } from 'lib/shared/color-utils.js'; import { isInvalidSidebarSource } from 'lib/shared/message-utils.js'; import { getThreadTypeParentRequirement } from 'lib/shared/thread-utils.js'; import type { Shape } from 'lib/types/core.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import type { RawMessageInfo, MessageData } from 'lib/types/message-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { threadTypes, threadTypeIsCommunityRoot, } from 'lib/types/thread-types-enum.js'; import { type ServerNewThreadRequest, type NewThreadResponse, } from 'lib/types/thread-types.js'; import type { ServerUpdateInfo } from 'lib/types/update-types.js'; import type { UserInfos } from 'lib/types/user-types.js'; import { pushAll } from 'lib/utils/array.js'; import { ServerError } from 'lib/utils/errors.js'; import { promiseAll } from 'lib/utils/promises.js'; import { firstLine } from 'lib/utils/string-utils.js'; import createIDs from './id-creator.js'; import createMessages from './message-creator.js'; import { createInitialRolesForNewThread } from './role-creator.js'; import type { UpdatesForCurrentSession } from './update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchLatestEditMessageContentByID, fetchMessageInfoByID, } from '../fetchers/message-fetchers.js'; import { determineThreadAncestry, personalThreadQuery, } from '../fetchers/thread-fetchers.js'; import { checkThreadPermission, validateCandidateMembers, } from '../fetchers/thread-permission-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { changeRole, recalculateThreadPermissions, commitMembershipChangeset, getChangesetCommitResultForExistingThread, type MembershipChangeset, } from '../updaters/thread-permission-updaters.js'; import { joinThread } from '../updaters/thread-updaters.js'; import RelationshipChangeset from '../utils/relationship-changeset.js'; const { commbot } = bots; const privateThreadDescription: string = 'This is your private chat, ' + 'where you can set reminders and jot notes in private!'; type CreateThreadOptions = Shape<{ +forceAddMembers: boolean, +updatesForCurrentSession: UpdatesForCurrentSession, +silentlyFailMembers: boolean, }>; // If forceAddMembers is set, we will allow the viewer to add random users who // they aren't friends with. We will only fail if the viewer is trying to add // somebody who they have blocked or has blocked them. On the other hand, if // forceAddMembers is not set, we will fail if the viewer tries to add somebody // who they aren't friends with and doesn't have a membership row with a // nonnegative role for the parent thread. async function createThread( viewer: Viewer, request: ServerNewThreadRequest, options?: CreateThreadOptions, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const forceAddMembers = options?.forceAddMembers ?? false; const updatesForCurrentSession = options?.updatesForCurrentSession ?? 'return'; const silentlyFailMembers = options?.silentlyFailMembers ?? false; const threadType = request.type; const shouldCreateRelationships = forceAddMembers || threadType === threadTypes.PERSONAL; let parentThreadID = request.parentThreadID ? request.parentThreadID : null; const initialMemberIDsFromRequest = request.initialMemberIDs && request.initialMemberIDs.length > 0 ? [...new Set(request.initialMemberIDs)] : null; const ghostMemberIDsFromRequest = request.ghostMemberIDs && request.ghostMemberIDs.length > 0 ? [...new Set(request.ghostMemberIDs)] : null; const sourceMessageID = request.sourceMessageID ? request.sourceMessageID : null; invariant( threadType !== threadTypes.SIDEBAR || sourceMessageID, 'sourceMessageID should be set for sidebar', ); const parentRequirement = getThreadTypeParentRequirement(threadType); if ( (parentRequirement === 'required' && !parentThreadID) || (parentRequirement === 'disabled' && parentThreadID) ) { throw new ServerError('invalid_parameters'); } if ( threadType === threadTypes.PERSONAL && request.initialMemberIDs?.length !== 1 ) { throw new ServerError('invalid_parameters'); } const requestParentThreadID = parentThreadID; const confirmParentPermissionPromise = (async () => { if (!requestParentThreadID) { return; } const hasParentPermission = await checkThreadPermission( viewer, requestParentThreadID, threadType === threadTypes.SIDEBAR ? threadPermissions.CREATE_SIDEBARS : threadPermissions.CREATE_SUBCHANNELS, ); if (!hasParentPermission) { throw new ServerError('invalid_credentials'); } })(); // This is a temporary hack until we release actual E2E-encrypted local // conversations. For now we are hosting all root threads on Ashoat's // keyserver, so we set them to the have the Genesis community as their // parent thread. if (!parentThreadID && !threadTypeIsCommunityRoot(threadType)) { parentThreadID = genesis.id; } const determineThreadAncestryPromise = determineThreadAncestry( parentThreadID, threadType, ); const validateMembersPromise = (async () => { const threadAncestry = await determineThreadAncestryPromise; const defaultRolePermissions = getRolePermissionBlobs(threadType).Members; const { initialMemberIDs, ghostMemberIDs } = await validateCandidateMembers( viewer, { initialMemberIDs: initialMemberIDsFromRequest, ghostMemberIDs: ghostMemberIDsFromRequest, }, { threadType, parentThreadID, containingThreadID: threadAncestry.containingThreadID, defaultRolePermissions, }, { requireRelationship: !shouldCreateRelationships }, ); if ( !silentlyFailMembers && (Number(initialMemberIDs?.length) < Number(initialMemberIDsFromRequest?.length) || Number(ghostMemberIDs?.length) < Number(ghostMemberIDsFromRequest?.length)) ) { throw new ServerError('invalid_credentials'); } return { initialMemberIDs, ghostMemberIDs }; })(); - const checkPromises = {}; - checkPromises.confirmParentPermission = confirmParentPermissionPromise; - checkPromises.threadAncestry = determineThreadAncestryPromise; - checkPromises.validateMembers = validateMembersPromise; - if (sourceMessageID) { - checkPromises.sourceMessage = fetchMessageInfoByID(viewer, sourceMessageID); - } + const sourceMessagePromise: Promise = sourceMessageID + ? fetchMessageInfoByID(viewer, sourceMessageID) + : Promise.resolve(undefined); const { sourceMessage, threadAncestry, validateMembers: { initialMemberIDs, ghostMemberIDs }, - } = await promiseAll(checkPromises); + } = await promiseAll({ + sourceMessage: sourceMessagePromise, + threadAncestry: determineThreadAncestryPromise, + validateMembers: validateMembersPromise, + confirmParentPermission: confirmParentPermissionPromise, + }); if (sourceMessage && isInvalidSidebarSource(sourceMessage)) { throw new ServerError('invalid_parameters'); } let { id } = request; if (id === null || id === undefined) { const ids = await createIDs('threads', 1); id = ids[0]; } const newRoles = await createInitialRolesForNewThread(id, threadType); const name = request.name ? firstLine(request.name) : null; const description = request.description ? request.description : null; let color = request.color ? request.color.toLowerCase() : generateRandomColor(); if (threadType === threadTypes.PERSONAL) { color = generatePendingThreadColor([ ...(request.initialMemberIDs ?? []), viewer.id, ]); } const time = Date.now(); const row = [ id, threadType, name, description, viewer.userID, time, color, parentThreadID, threadAncestry.containingThreadID, threadAncestry.community, threadAncestry.depth, sourceMessageID, ]; let existingThreadQuery = null; if (threadType === threadTypes.PERSONAL) { const otherMemberID = initialMemberIDs?.[0]; invariant( otherMemberID, 'Other member id should be set for a PERSONAL thread', ); existingThreadQuery = personalThreadQuery(viewer.userID, otherMemberID); } else if (sourceMessageID) { existingThreadQuery = SQL` SELECT t.id FROM threads t WHERE t.source_message = ${sourceMessageID} `; } if (existingThreadQuery) { const query = SQL` INSERT INTO threads(id, type, name, description, creator, creation_time, color, parent_thread_id, containing_thread_id, community, depth, source_message) SELECT ${row} WHERE NOT EXISTS (`; query.append(existingThreadQuery).append(SQL`)`); const [result] = await dbQuery(query); if (result.affectedRows === 0) { const deleteRoles = SQL` DELETE FROM roles WHERE id IN (${newRoles.default.id}, ${newRoles.creator.id}) `; const deleteIDs = SQL` DELETE FROM ids WHERE id IN (${id}, ${newRoles.default.id}, ${newRoles.creator.id}) `; const [[existingThreadResult]] = await Promise.all([ dbQuery(existingThreadQuery), dbQuery(deleteRoles), dbQuery(deleteIDs), ]); invariant(existingThreadResult.length > 0, 'thread should exist'); const existingThreadID = existingThreadResult[0].id.toString(); invariant(request.calendarQuery, 'calendar query should exist'); const calendarQuery = { ...request.calendarQuery, filters: [ ...request.calendarQuery.filters, { type: 'threads', threadIDs: [existingThreadID] }, ], }; let joinUpdateInfos: $ReadOnlyArray = []; let userInfos: UserInfos = {}; let newMessageInfos: $ReadOnlyArray = []; if (threadType !== threadTypes.PERSONAL) { const joinThreadResult = await joinThread(viewer, { threadID: existingThreadID, calendarQuery, }); joinUpdateInfos = joinThreadResult.updatesResult.newUpdates; userInfos = joinThreadResult.userInfos; newMessageInfos = joinThreadResult.rawMessageInfos; } const { viewerUpdates: newUpdates, userInfos: changesetUserInfos } = await getChangesetCommitResultForExistingThread( viewer, existingThreadID, joinUpdateInfos, { calendarQuery, updatesForCurrentSession }, ); userInfos = { ...userInfos, ...changesetUserInfos }; return { newThreadID: existingThreadID, updatesResult: { newUpdates, }, userInfos, newMessageInfos, }; } } else { const query = SQL` INSERT INTO threads(id, type, name, description, creator, creation_time, color, parent_thread_id, containing_thread_id, community, depth, source_message) VALUES ${[row]} `; await dbQuery(query); } const initialMemberPromise: Promise = initialMemberIDs ? changeRole(id, initialMemberIDs, null, { setNewMembersToUnread: true }) : Promise.resolve(undefined); const ghostMemberPromise: Promise = ghostMemberIDs ? changeRole(id, ghostMemberIDs, -1) : Promise.resolve(undefined); const [ creatorChangeset, initialMembersChangeset, ghostMembersChangeset, recalculatePermissionsChangeset, ] = await Promise.all([ changeRole(id, [viewer.userID], newRoles.creator.id), initialMemberPromise, ghostMemberPromise, recalculateThreadPermissions(id), ]); const { membershipRows: creatorMembershipRows, relationshipChangeset: creatorRelationshipChangeset, } = creatorChangeset; const { membershipRows: recalculateMembershipRows, relationshipChangeset: recalculateRelationshipChangeset, } = recalculatePermissionsChangeset; const membershipRows = [ ...creatorMembershipRows, ...recalculateMembershipRows, ]; const relationshipChangeset = new RelationshipChangeset(); relationshipChangeset.addAll(creatorRelationshipChangeset); relationshipChangeset.addAll(recalculateRelationshipChangeset); if (initialMembersChangeset) { const { membershipRows: initialMembersMembershipRows, relationshipChangeset: initialMembersRelationshipChangeset, } = initialMembersChangeset; pushAll(membershipRows, initialMembersMembershipRows); relationshipChangeset.addAll(initialMembersRelationshipChangeset); } if (ghostMembersChangeset) { const { membershipRows: ghostMembersMembershipRows, relationshipChangeset: ghostMembersRelationshipChangeset, } = ghostMembersChangeset; pushAll(membershipRows, ghostMembersMembershipRows); relationshipChangeset.addAll(ghostMembersRelationshipChangeset); } const changeset = { membershipRows, relationshipChangeset }; const { viewerUpdates, userInfos } = await commitMembershipChangeset( viewer, changeset, { updatesForCurrentSession, }, ); const initialMemberAndCreatorIDs = initialMemberIDs ? [...initialMemberIDs, viewer.userID] : [viewer.userID]; const messageDatas: Array = []; if (threadType !== threadTypes.SIDEBAR) { messageDatas.push({ type: messageTypes.CREATE_THREAD, threadID: id, creatorID: viewer.userID, time, initialThreadState: { type: threadType, name, parentThreadID, color, memberIDs: initialMemberAndCreatorIDs, }, }); } else { invariant(parentThreadID, 'parentThreadID should be set for sidebar'); if (!sourceMessage) { throw new ServerError('invalid_parameters'); } invariant( sourceMessage.type !== messageTypes.REACTION && sourceMessage.type !== messageTypes.EDIT_MESSAGE && sourceMessage.type !== messageTypes.SIDEBAR_SOURCE && sourceMessage.type !== messageTypes.TOGGLE_PIN, 'Invalid sidebar source type', ); let editedSourceMessage = sourceMessage; if (sourceMessageID && sourceMessage.type === messageTypes.TEXT) { const editMessageContent = await fetchLatestEditMessageContentByID( sourceMessageID, ); if (editMessageContent) { editedSourceMessage = { ...sourceMessage, text: editMessageContent.text, }; } } messageDatas.push( { type: messageTypes.SIDEBAR_SOURCE, threadID: id, creatorID: viewer.userID, time, sourceMessage: editedSourceMessage, }, { type: messageTypes.CREATE_SIDEBAR, threadID: id, creatorID: viewer.userID, time, sourceMessageAuthorID: sourceMessage.creatorID, initialThreadState: { name, parentThreadID, color, memberIDs: initialMemberAndCreatorIDs, }, }, ); } if ( parentThreadID && threadType !== threadTypes.SIDEBAR && (parentThreadID !== genesis.id || threadType === threadTypes.COMMUNITY_OPEN_SUBTHREAD || threadType === threadTypes.COMMUNITY_OPEN_ANNOUNCEMENT_SUBTHREAD) ) { messageDatas.push({ type: messageTypes.CREATE_SUB_THREAD, threadID: parentThreadID, creatorID: viewer.userID, time, childThreadID: id, }); } const newMessageInfos = await createMessages( viewer, messageDatas, updatesForCurrentSession, ); return { newThreadID: id, updatesResult: { newUpdates: viewerUpdates, }, userInfos, newMessageInfos, }; } function createPrivateThread(viewer: Viewer): Promise { return createThread( viewer, { type: threadTypes.PRIVATE, description: privateThreadDescription, ghostMemberIDs: [commbot.userID], }, { forceAddMembers: true, }, ); } export { createThread, createPrivateThread, privateThreadDescription }; diff --git a/keyserver/src/creators/update-creator.js b/keyserver/src/creators/update-creator.js index b961fa20f..d091e0665 100644 --- a/keyserver/src/creators/update-creator.js +++ b/keyserver/src/creators/update-creator.js @@ -1,624 +1,662 @@ // @flow import invariant from 'invariant'; import { nonThreadCalendarFilters } from 'lib/selectors/calendar-filter-selectors.js'; import { keyForUpdateData, keyForUpdateInfo, rawUpdateInfoFromUpdateData, } from 'lib/shared/update-utils.js'; import type { UpdateInfosRawData, UpdateTypes, } from 'lib/shared/updates/update-spec.js'; import { updateSpecs } from 'lib/shared/updates/update-specs.js'; import { type CalendarQuery, defaultCalendarQuery, + type RawEntryInfos, type RawEntryInfo, + type FetchEntryInfosBase, } from 'lib/types/entry-types.js'; import { defaultNumberPerThread, type MessageSelectionCriteria, + type FetchMessageInfosResult, type RawMessageInfo, } from 'lib/types/message-types.js'; import { type UpdateTarget, redisMessageTypes, type NewUpdatesRedisMessage, } from 'lib/types/redis-types.js'; import type { RawThreadInfos } from 'lib/types/thread-types'; import { type ServerUpdateInfo, type UpdateData, type RawUpdateInfo, type CreateUpdatesResult, } from 'lib/types/update-types.js'; -import type { UserInfos } from 'lib/types/user-types.js'; +import type { UserInfos, LoggedInUserInfo } from 'lib/types/user-types.js'; import { promiseAll } from 'lib/utils/promises.js'; import createIDs from './id-creator.js'; import { dbQuery, SQL, mergeAndConditions } from '../database/database.js'; import type { SQLStatementType } from '../database/types.js'; import { deleteUpdatesByConditions } from '../deleters/update-deleters.js'; import { fetchEntryInfos, fetchEntryInfosByID, } from '../fetchers/entry-fetchers.js'; import { fetchMessageInfos } from '../fetchers/message-fetchers.js'; -import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; +import { + fetchThreadInfos, + type FetchThreadInfosResult, +} from '../fetchers/thread-fetchers.js'; import { fetchKnownUserInfos, fetchCurrentUserInfo, } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { channelNameForUpdateTarget, publisher } from '../socket/redis.js'; export type UpdatesForCurrentSession = // This is the default if no Viewer is passed, or if an isSocket Viewer is // passed in. We will broadcast to all valid sessions via Redis and return // nothing to the caller, relying on the current session's Redis listener to // pick up the updates and deliver them asynchronously. | 'broadcast' // This is the default if a non-isSocket Viewer is passed in. We avoid // broadcasting the update to the current session, and instead return the // update to the caller, who will handle delivering it to the client. | 'return' // This means we ignore any updates destined for the current session. // Presumably the caller knows what they are doing and has a different way of // communicating the relevant information to the client. | 'ignore'; type DeleteCondition = { +userID: string, +target: ?string, +types: UpdateTypes, }; export type ViewerInfo = | { viewer: Viewer, calendarQuery?: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, } | { viewer: Viewer, calendarQuery: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, threadInfos: RawThreadInfos, }; const defaultUpdateCreationResult = { viewerUpdates: [], userInfos: {} }; const sortFunction = ( a: UpdateData | ServerUpdateInfo, b: UpdateData | ServerUpdateInfo, ) => a.time - b.time; const deleteUpdatesBatchSize = 500; // Creates rows in the updates table based on the inputed updateDatas. Returns // UpdateInfos pertaining to the provided viewerInfo, as well as related // UserInfos. If no viewerInfo is provided, no UpdateInfos will be returned. And // the update row won't have an updater column, meaning no session will be // excluded from the update. async function createUpdates( updateDatas: $ReadOnlyArray, passedViewerInfo?: ?ViewerInfo, ): Promise { if (updateDatas.length === 0) { return defaultUpdateCreationResult; } // viewer.session will throw for a script Viewer let viewerInfo = passedViewerInfo; if ( viewerInfo && (viewerInfo.viewer.isScriptViewer || !viewerInfo.viewer.loggedIn) ) { viewerInfo = null; } const sortedUpdateDatas = [...updateDatas].sort(sortFunction); const filteredUpdateDatas: UpdateData[] = []; const keyedUpdateDatas: Map = new Map(); for (const updateData of sortedUpdateDatas) { const key = keyForUpdateData(updateData); if (!key) { filteredUpdateDatas.push(updateData); continue; } const conditionKey = `${updateData.userID}|${key}`; const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} has conditionKey ` + `${conditionKey} but no deleteCondition`, ); const curUpdateDatas = keyedUpdateDatas.get(conditionKey); if (!curUpdateDatas) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const filteredCurrent = curUpdateDatas.filter(curUpdateData => filterOnDeleteCondition(curUpdateData, deleteCondition), ); if (filteredCurrent.length === 0) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const isNewUpdateDataFiltered = !filteredCurrent.every(curUpdateData => { const curDeleteCondition = getDeleteCondition(curUpdateData); invariant( curDeleteCondition, `updateData of type ${curUpdateData.type} is in keyedUpdateDatas ` + "but doesn't have a deleteCondition", ); return filterOnDeleteCondition(updateData, curDeleteCondition); }); if (!isNewUpdateDataFiltered) { filteredCurrent.push(updateData); } keyedUpdateDatas.set(conditionKey, filteredCurrent); } for (const keyUpdateDatas of keyedUpdateDatas.values()) { filteredUpdateDatas.push(...keyUpdateDatas); } const ids = await createIDs('updates', filteredUpdateDatas.length); let updatesForCurrentSession = viewerInfo && viewerInfo.updatesForCurrentSession; if (!updatesForCurrentSession && viewerInfo) { updatesForCurrentSession = viewerInfo.viewer.isSocket ? 'broadcast' : 'return'; } else if (!updatesForCurrentSession) { updatesForCurrentSession = 'broadcast'; } const dontBroadcastSession = updatesForCurrentSession !== 'broadcast' && viewerInfo ? viewerInfo.viewer.session : null; const publishInfos: Map = new Map(); const viewerRawUpdateInfos: RawUpdateInfo[] = []; const insertRows: (?(number | string))[][] = []; const earliestTime: Map = new Map(); for (let i = 0; i < filteredUpdateDatas.length; i++) { const updateData = filteredUpdateDatas[i]; const target = getTargetFromUpdateData(updateData); const rawUpdateInfo = rawUpdateInfoFromUpdateData(updateData, ids[i]); if (!target || !dontBroadcastSession || target !== dontBroadcastSession) { const updateTarget = target ? { userID: updateData.userID, sessionID: target } : { userID: updateData.userID }; const channelName = channelNameForUpdateTarget(updateTarget); let publishInfo = publishInfos.get(channelName); if (!publishInfo) { publishInfo = { updateTarget, rawUpdateInfos: [] }; publishInfos.set(channelName, publishInfo); } publishInfo.rawUpdateInfos.push(rawUpdateInfo); } if ( updatesForCurrentSession === 'return' && viewerInfo && updateData.userID === viewerInfo.viewer.id && (!target || target === viewerInfo.viewer.session) ) { viewerRawUpdateInfos.push(rawUpdateInfo); } if (viewerInfo && target && viewerInfo.viewer.session === target) { // In the case where this update is being created only for the current // session, there's no reason to insert a row into the updates table continue; } const content = updateSpecs[updateData.type].updateContentForServerDB(updateData); const key = keyForUpdateData(updateData); if (key) { const conditionKey = `${updateData.userID}|${key}`; const currentEarliestTime = earliestTime.get(conditionKey); if (!currentEarliestTime || updateData.time < currentEarliestTime) { earliestTime.set(conditionKey, updateData.time); } } const insertRow = [ ids[i], updateData.userID, updateData.type, key, content, updateData.time, dontBroadcastSession, target, ]; insertRows.push(insertRow); } type DeleteUpdatesConditions = { key: string, target?: string, types?: number[], time?: number, }; const usersByConditions: Map< string, { conditions: DeleteUpdatesConditions, users: Set, }, > = new Map(); for (const [conditionKey, keyUpdateDatas] of keyedUpdateDatas) { const deleteConditionByTarget: Map = new Map(); for (const updateData of keyUpdateDatas) { const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} is in keyedUpdateDatas but ` + "doesn't have a deleteCondition", ); const { target, types } = deleteCondition; const existingDeleteCondition = deleteConditionByTarget.get(target); if (!existingDeleteCondition) { deleteConditionByTarget.set(target, deleteCondition); continue; } const existingTypes = existingDeleteCondition.types; if (existingTypes === 'all_types') { continue; } else if (types === 'all_types') { deleteConditionByTarget.set(target, deleteCondition); continue; } const mergedTypes = new Set([...types, ...existingTypes]); deleteConditionByTarget.set(target, { ...deleteCondition, types: mergedTypes, }); } for (const deleteCondition of deleteConditionByTarget.values()) { const { userID, target, types } = deleteCondition; const key = conditionKey.split('|')[1]; const conditions: DeleteUpdatesConditions = { key }; if (target) { conditions.target = target; } if (types !== 'all_types') { invariant(types.size > 0, 'deleteCondition had empty types set'); conditions.types = [...types]; } const earliestTimeForCondition = earliestTime.get(conditionKey); if (earliestTimeForCondition) { conditions.time = earliestTimeForCondition; } const conditionsKey = JSON.stringify(conditions); if (!usersByConditions.has(conditionsKey)) { usersByConditions.set(conditionsKey, { conditions, users: new Set(), }); } usersByConditions.get(conditionsKey)?.users.add(userID); } } const deleteSQLConditions: SQLStatementType[] = []; for (const { conditions, users } of usersByConditions.values()) { const sqlConditions = [ SQL`u.user IN (${[...users]})`, SQL`u.key = ${conditions.key}`, ]; if (conditions.target) { sqlConditions.push(SQL`u.target = ${conditions.target}`); } if (conditions.types) { sqlConditions.push(SQL`u.type IN (${conditions.types})`); } if (conditions.time) { sqlConditions.push(SQL`u.time < ${conditions.time}`); } deleteSQLConditions.push(mergeAndConditions(sqlConditions)); } - const promises = {}; - - if (insertRows.length > 0) { + const insertPromise = (async () => { + if (insertRows.length === 0) { + return; + } const insertQuery = SQL` INSERT INTO updates(id, user, type, \`key\`, content, time, updater, target) `; insertQuery.append(SQL`VALUES ${insertRows}`); - promises.insert = dbQuery(insertQuery); - } + await dbQuery(insertQuery); + })(); - if (publishInfos.size > 0) { - promises.redis = redisPublish(publishInfos.values(), dontBroadcastSession); - } + const redisPromise = + publishInfos.size > 0 + ? redisPublish(publishInfos.values(), dontBroadcastSession) + : Promise.resolve(undefined); - if (deleteSQLConditions.length > 0) { - promises.delete = (async () => { - while (deleteSQLConditions.length > 0) { - const batch = deleteSQLConditions.splice(0, deleteUpdatesBatchSize); - await deleteUpdatesByConditions(batch); - } - })(); - } + const deletePromise = (async () => { + if (deleteSQLConditions.length === 0) { + return; + } + while (deleteSQLConditions.length > 0) { + const batch = deleteSQLConditions.splice(0, deleteUpdatesBatchSize); + await deleteUpdatesByConditions(batch); + } + })(); - if (viewerRawUpdateInfos.length > 0) { + const updatesPromise: Promise = (async () => { + if (viewerRawUpdateInfos.length === 0) { + return undefined; + } invariant(viewerInfo, 'should be set'); - promises.updatesResult = fetchUpdateInfosWithRawUpdateInfos( + return await fetchUpdateInfosWithRawUpdateInfos( viewerRawUpdateInfos, viewerInfo, ); - } + })(); - const { updatesResult } = await promiseAll(promises); + const { updatesResult } = await promiseAll({ + updatesResult: updatesPromise, + insertResult: insertPromise, + redisResult: redisPromise, + deleteResult: deletePromise, + }); if (!updatesResult) { return defaultUpdateCreationResult; } const { updateInfos, userInfos } = updatesResult; return { viewerUpdates: updateInfos, userInfos }; } export type FetchUpdatesResult = { +updateInfos: $ReadOnlyArray, +userInfos: UserInfos, }; async function fetchUpdateInfosWithRawUpdateInfos( rawUpdateInfos: $ReadOnlyArray, viewerInfo: ViewerInfo, ): Promise { const entitiesToFetch = rawUpdateInfos .map(info => updateSpecs[info.type].entitiesToFetch?.(info)) .filter(Boolean); const currentUserNeedsFetch = entitiesToFetch.some( ({ currentUser }) => currentUser, ); const threadIDsNeedingFetch = viewerInfo.threadInfos ? new Set() : new Set(entitiesToFetch.map(({ threadID }) => threadID).filter(Boolean)); const entryIDsNeedingFetch = new Set( entitiesToFetch.map(({ entryID }) => entryID).filter(Boolean), ); // entries and messages const threadIDsNeedingDetailedFetch = new Set( entitiesToFetch .map(({ detailedThreadID }) => detailedThreadID) .filter(Boolean), ); const userIDsToFetch = new Set( entitiesToFetch.map(({ userID }) => userID).filter(Boolean), ); - const promises = {}; - const { viewer } = viewerInfo; - if (!viewerInfo.threadInfos && threadIDsNeedingFetch.size > 0) { - promises.threadResult = fetchThreadInfos(viewer, { + const threadPromise: Promise = (async () => { + if (viewerInfo.threadInfos || threadIDsNeedingFetch.size === 0) { + return undefined; + } + return await fetchThreadInfos(viewer, { threadIDs: threadIDsNeedingFetch, }); - } + })(); - let calendarQuery: ?CalendarQuery = viewerInfo.calendarQuery - ? viewerInfo.calendarQuery - : null; - if (!calendarQuery && viewer.hasSessionInfo) { + let calendarQueryTmp: ?CalendarQuery = viewerInfo.calendarQuery ?? null; + if (!calendarQueryTmp && viewer.hasSessionInfo) { // This should only ever happen for "legacy" clients who call in without // providing this information. These clients wouldn't know how to deal with // the corresponding UpdateInfos anyways, so no reason to be worried. - calendarQuery = viewer.calendarQuery; - } else if (!calendarQuery) { - calendarQuery = defaultCalendarQuery(viewer.platform, viewer.timeZone); + calendarQueryTmp = viewer.calendarQuery; + } else if (!calendarQueryTmp) { + calendarQueryTmp = defaultCalendarQuery(viewer.platform, viewer.timeZone); } - if (threadIDsNeedingDetailedFetch.size > 0) { + const calendarQuery = calendarQueryTmp; + + const messageInfosPromise: Promise = (async () => { + if (threadIDsNeedingDetailedFetch.size === 0) { + return undefined; + } const threadCursors: { [string]: ?string } = {}; for (const threadID of threadIDsNeedingDetailedFetch) { threadCursors[threadID] = null; } const messageSelectionCriteria: MessageSelectionCriteria = { threadCursors, }; - promises.messageInfosResult = fetchMessageInfos( + return await fetchMessageInfos( viewer, messageSelectionCriteria, defaultNumberPerThread, ); + })(); + + const calendarPromise: Promise = (async () => { + if (threadIDsNeedingDetailedFetch.size === 0) { + return undefined; + } const threadCalendarQuery = { ...calendarQuery, filters: [ ...nonThreadCalendarFilters(calendarQuery.filters), { type: 'threads', threadIDs: [...threadIDsNeedingDetailedFetch] }, ], }; - promises.calendarResult = fetchEntryInfos(viewer, [threadCalendarQuery]); - } + return await fetchEntryInfos(viewer, [threadCalendarQuery]); + })(); - if (entryIDsNeedingFetch.size > 0) { - promises.entryInfosResult = fetchEntryInfosByID( - viewer, - entryIDsNeedingFetch, - ); - } - - if (currentUserNeedsFetch) { - promises.currentUserInfoResult = (async () => { - const currentUserInfo = await fetchCurrentUserInfo(viewer); - invariant(currentUserInfo.anonymous === undefined, 'should be logged in'); - return currentUserInfo; - })(); - } + const entryInfosPromise: Promise = (async () => { + if (entryIDsNeedingFetch.size === 0) { + return undefined; + } + return await fetchEntryInfosByID(viewer, entryIDsNeedingFetch); + })(); - if (userIDsToFetch.size > 0) { - promises.userInfosResult = fetchKnownUserInfos(viewer, [...userIDsToFetch]); - } + const currentUserInfoPromise: Promise = (async () => { + if (!currentUserNeedsFetch) { + return undefined; + } + const currentUserInfo = await fetchCurrentUserInfo(viewer); + invariant(currentUserInfo.anonymous === undefined, 'should be logged in'); + return currentUserInfo; + })(); + + const userInfosPromise: Promise = (async () => { + if (userIDsToFetch.size === 0) { + return undefined; + } + return await fetchKnownUserInfos(viewer, [...userIDsToFetch]); + })(); const { threadResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, userInfosResult, - } = await promiseAll(promises); + } = await promiseAll({ + threadResult: threadPromise, + messageInfosResult: messageInfosPromise, + calendarResult: calendarPromise, + entryInfosResult: entryInfosPromise, + currentUserInfoResult: currentUserInfoPromise, + userInfosResult: userInfosPromise, + }); let threadInfos = {}; if (viewerInfo.threadInfos) { threadInfos = viewerInfo.threadInfos; } else if (threadResult) { threadInfos = threadResult.threadInfos; } return await updateInfosFromRawUpdateInfos(viewer, rawUpdateInfos, { threadInfos, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, userInfosResult, }); } async function updateInfosFromRawUpdateInfos( viewer: Viewer, rawUpdateInfos: $ReadOnlyArray, rawData: UpdateInfosRawData, ): Promise { const { messageInfosResult, calendarResult, userInfosResult } = rawData; const rawEntryInfosByThreadID: { [string]: Array } = {}; for (const entryInfo of calendarResult?.rawEntryInfos ?? []) { if (!rawEntryInfosByThreadID[entryInfo.threadID]) { rawEntryInfosByThreadID[entryInfo.threadID] = []; } rawEntryInfosByThreadID[entryInfo.threadID].push(entryInfo); } const rawMessageInfosByThreadID: { [string]: Array } = {}; for (const messageInfo of messageInfosResult?.rawMessageInfos ?? []) { if (!rawMessageInfosByThreadID[messageInfo.threadID]) { rawMessageInfosByThreadID[messageInfo.threadID] = []; } rawMessageInfosByThreadID[messageInfo.threadID].push(messageInfo); } const params = { data: rawData, rawEntryInfosByThreadID, rawMessageInfosByThreadID, }; const updateInfos = rawUpdateInfos .map(update => updateSpecs[update.type].updateInfoFromRawInfo(update, params), ) .filter(Boolean); updateInfos.sort(sortFunction); // Now we'll attempt to merge UpdateInfos so that we only have one per key const updateForKey: Map = new Map(); const mergedUpdates: ServerUpdateInfo[] = []; for (const updateInfo of updateInfos) { const key = keyForUpdateInfo(updateInfo); if (!key) { mergedUpdates.push(updateInfo); continue; } const typesOfReplacedUpdatesForMatchingKey = updateSpecs[updateInfo.type].typesOfReplacedUpdatesForMatchingKey; const currentUpdateInfo = updateForKey.get(key); if ( !currentUpdateInfo || typesOfReplacedUpdatesForMatchingKey === 'all_types' || typesOfReplacedUpdatesForMatchingKey?.has(currentUpdateInfo.type) ) { updateForKey.set(key, updateInfo); } } for (const [, updateInfo] of updateForKey) { mergedUpdates.push(updateInfo); } mergedUpdates.sort(sortFunction); return { updateInfos: mergedUpdates, userInfos: userInfosResult ?? {} }; } type PublishInfo = { updateTarget: UpdateTarget, rawUpdateInfos: RawUpdateInfo[], }; async function redisPublish( publishInfos: Iterator, dontBroadcastSession: ?string, ): Promise { for (const publishInfo of publishInfos) { const { updateTarget, rawUpdateInfos } = publishInfo; const redisMessage: NewUpdatesRedisMessage = { type: redisMessageTypes.NEW_UPDATES, updates: rawUpdateInfos, }; if (!updateTarget.sessionID && dontBroadcastSession) { redisMessage.ignoreSession = dontBroadcastSession; } publisher.sendMessage(updateTarget, redisMessage); } } function getTargetFromUpdateData(updateData: UpdateData): ?string { if (updateData.targetSession) { return updateData.targetSession; } else if (updateData.targetCookie) { return updateData.targetCookie; } else { return null; } } function getDeleteCondition(updateData: UpdateData): ?DeleteCondition { const types = updateSpecs[updateData.type].deleteCondition; if (!types) { return null; } const target = getTargetFromUpdateData(updateData); const { userID } = updateData; return { userID, target, types }; } function filterOnDeleteCondition( updateData: UpdateData, deleteCondition: DeleteCondition, ): boolean { invariant( updateData.userID === deleteCondition.userID, `updateData of type ${updateData.type} being compared to wrong userID`, ); if (deleteCondition.target) { const target = getTargetFromUpdateData(updateData); if (target !== deleteCondition.target) { return true; } } if (deleteCondition.types === 'all_types') { return false; } return !deleteCondition.types.has(updateData.type); } export { createUpdates, fetchUpdateInfosWithRawUpdateInfos }; diff --git a/keyserver/src/deleters/account-deleters.js b/keyserver/src/deleters/account-deleters.js index cba7122f8..836c60da2 100644 --- a/keyserver/src/deleters/account-deleters.js +++ b/keyserver/src/deleters/account-deleters.js @@ -1,146 +1,153 @@ // @flow import { getRustAPI } from 'rust-node-addon'; import type { LogOutResponse } from 'lib/types/account-types.js'; import type { ReservedUsernameMessage } from 'lib/types/crypto-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import type { UserInfo } from 'lib/types/user-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchKnownUserInfos, fetchUsername, } from '../fetchers/user-fetchers.js'; import { rescindPushNotifs } from '../push/rescind.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { createNewAnonymousCookie } from '../session/cookies.js'; -import type { Viewer } from '../session/viewer.js'; +import type { Viewer, AnonymousViewerData } from '../session/viewer.js'; import { fetchOlmAccount } from '../updaters/olm-account-updater.js'; async function deleteAccount(viewer: Viewer): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const deletedUserID = viewer.userID; await rescindPushNotifs(SQL`n.user = ${deletedUserID}`, SQL`NULL`); const knownUserInfos = await fetchKnownUserInfos(viewer); const usersToUpdate: $ReadOnlyArray = values(knownUserInfos).filter( (user: UserInfo): boolean => user.id !== deletedUserID, ); // TODO: if this results in any orphaned orgs, convert them to chats const deletionQuery = SQL` START TRANSACTION; DELETE FROM users WHERE id = ${deletedUserID}; DELETE FROM ids WHERE id = ${deletedUserID}; DELETE c, i FROM cookies c LEFT JOIN ids i ON i.id = c.id WHERE c.user = ${deletedUserID}; DELETE FROM memberships WHERE user = ${deletedUserID}; DELETE FROM focused WHERE user = ${deletedUserID}; DELETE n, i FROM notifications n LEFT JOIN ids i ON i.id = n.id WHERE n.user = ${deletedUserID}; DELETE u, i FROM updates u LEFT JOIN ids i ON i.id = u.id WHERE u.user = ${deletedUserID}; DELETE s, i FROM sessions s LEFT JOIN ids i ON i.id = s.id WHERE s.user = ${deletedUserID}; DELETE r, i FROM reports r LEFT JOIN ids i ON i.id = r.id WHERE r.user = ${deletedUserID}; DELETE u, i FROM uploads u LEFT JOIN ids i on i.id = u.id WHERE u.container = ${deletedUserID}; DELETE FROM relationships_undirected WHERE user1 = ${deletedUserID}; DELETE FROM relationships_undirected WHERE user2 = ${deletedUserID}; DELETE FROM relationships_directed WHERE user1 = ${deletedUserID}; DELETE FROM relationships_directed WHERE user2 = ${deletedUserID}; COMMIT; `; - const promises = {}; - promises.deletion = dbQuery(deletionQuery, { multipleStatements: true }); - if (!viewer.isScriptViewer) { - promises.anonymousViewerData = createNewAnonymousCookie({ - platformDetails: viewer.platformDetails, - deviceToken: viewer.deviceToken, - }); - } - promises.username = fetchUsername(deletedUserID); - const { anonymousViewerData, username } = await promiseAll(promises); + const deletionPromise = dbQuery(deletionQuery, { multipleStatements: true }); + const anonymousViewerDataPromise: Promise = + (async () => { + if (viewer.isScriptViewer) { + return undefined; + } + return await createNewAnonymousCookie({ + platformDetails: viewer.platformDetails, + deviceToken: viewer.deviceToken, + }); + })(); + const usernamePromise = fetchUsername(deletedUserID); + const { anonymousViewerData, username } = await promiseAll({ + anonymousViewerData: anonymousViewerDataPromise, + username: usernamePromise, + deletion: deletionPromise, + }); if (username) { const issuedAt = new Date().toISOString(); const reservedUsernameMessage: ReservedUsernameMessage = { statement: 'Remove the following username from reserved list', payload: username, issuedAt, }; const message = JSON.stringify(reservedUsernameMessage); handleAsyncPromise( (async () => { const rustAPI = await getRustAPI(); const accountInfo = await fetchOlmAccount('content'); const signature = accountInfo.account.sign(message); await rustAPI.removeReservedUsername(message, signature); })(), ); } if (anonymousViewerData) { viewer.setNewCookie(anonymousViewerData); } const deletionUpdatesPromise = createAccountDeletionUpdates( usersToUpdate, deletedUserID, ); if (viewer.isScriptViewer) { await deletionUpdatesPromise; } else { handleAsyncPromise(deletionUpdatesPromise); } if (viewer.isScriptViewer) { return null; } return { currentUserInfo: { anonymous: true, }, }; } async function createAccountDeletionUpdates( knownUserInfos: $ReadOnlyArray, deletedUserID: string, ): Promise { const time = Date.now(); const updateDatas = []; for (const userInfo of knownUserInfos) { const { id: userID } = userInfo; updateDatas.push({ type: updateTypes.DELETE_ACCOUNT, userID, time, deletedUserID, }); } await createUpdates(updateDatas); } export { deleteAccount }; diff --git a/keyserver/src/fetchers/thread-fetchers.js b/keyserver/src/fetchers/thread-fetchers.js index a8495986a..7d8c20571 100644 --- a/keyserver/src/fetchers/thread-fetchers.js +++ b/keyserver/src/fetchers/thread-fetchers.js @@ -1,415 +1,415 @@ // @flow import invariant from 'invariant'; import { specialRoles } from 'lib/permissions/special-roles.js'; import { getAllThreadPermissions } from 'lib/permissions/thread-permissions.js'; import { rawThreadInfoFromServerThreadInfo, getContainingThreadID, getCommunity, } from 'lib/shared/thread-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { AvatarDBContent, ClientAvatar } from 'lib/types/avatar-types.js'; import type { RawMessageInfo, MessageInfo } from 'lib/types/message-types.js'; import { threadTypes, type ThreadType } from 'lib/types/thread-types-enum.js'; import { type RawThreadInfos, type ServerThreadInfo, type RawThreadInfo, } from 'lib/types/thread-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { getUploadURL, makeUploadURI } from './upload-fetchers.js'; import { dbQuery, SQL, mergeAndConditions } from '../database/database.js'; import type { SQLStatementType } from '../database/types.js'; import type { Viewer } from '../session/viewer.js'; type FetchThreadInfosFilter = $Shape<{ +accessibleToUserID: string, +threadID: string, +threadIDs: $ReadOnlySet, +parentThreadID: string, +sourceMessageID: string, }>; function constructWhereClause( filter: FetchThreadInfosFilter, ): SQLStatementType { const fromTable = filter.accessibleToUserID ? 'memberships' : 'threads'; const conditions = []; if (filter.accessibleToUserID) { conditions.push( SQL`mm.user = ${filter.accessibleToUserID} AND mm.role > -1`, ); } if (filter.threadID && fromTable === 'memberships') { conditions.push(SQL`mm.thread = ${filter.threadID}`); } else if (filter.threadID) { conditions.push(SQL`t.id = ${filter.threadID}`); } if (filter.threadIDs && fromTable === 'memberships') { conditions.push(SQL`mm.thread IN (${[...filter.threadIDs]})`); } else if (filter.threadIDs) { conditions.push(SQL`t.id IN (${[...filter.threadIDs]})`); } if (filter.parentThreadID) { conditions.push(SQL`t.parent_thread_id = ${filter.parentThreadID}`); } if (filter.sourceMessageID) { conditions.push(SQL`t.source_message = ${filter.sourceMessageID}`); } if (conditions.length === 0) { return SQL``; } const clause = mergeAndConditions(conditions); return SQL`WHERE `.append(clause); } type FetchServerThreadInfosResult = { +threadInfos: { +[id: string]: ServerThreadInfo }, }; async function fetchServerThreadInfos( filter?: FetchThreadInfosFilter, ): Promise { if (filter?.threadIDs?.size === 0) { return { threadInfos: {} }; } let primaryFetchClause; if (filter?.accessibleToUserID) { primaryFetchClause = SQL` FROM memberships mm LEFT JOIN threads t ON t.id = mm.thread `; } else { primaryFetchClause = SQL` FROM threads t `; } const whereClause = filter ? constructWhereClause(filter) : ''; const rolesQuery = SQL` SELECT t.id, r.id AS role, r.name, r.permissions, r.special_role = ${specialRoles.DEFAULT_ROLE} AS is_default ` .append(primaryFetchClause) .append( SQL` LEFT JOIN roles r ON r.thread = t.id `, ) .append(whereClause); const threadsQuery = SQL` SELECT t.id, t.name, t.parent_thread_id, t.containing_thread_id, t.community, t.depth, t.color, t.description, t.type, t.creation_time, t.source_message, t.replies_count, t.avatar, t.pinned_count, m.user, m.role, m.permissions, m.subscription, m.last_read_message < m.last_message AS unread, m.sender, up.id AS upload_id, up.secret AS upload_secret, up.extra AS upload_extra ` .append(primaryFetchClause) .append( SQL` LEFT JOIN memberships m ON m.thread = t.id AND m.role >= 0 LEFT JOIN uploads up ON up.container = t.id `, ) .append(whereClause) .append(SQL` ORDER BY m.user ASC`); const [[threadsResult], [rolesResult]] = await Promise.all([ dbQuery(threadsQuery), dbQuery(rolesQuery), ]); const threadInfos = {}; for (const threadsRow of threadsResult) { const threadID = threadsRow.id.toString(); if (!threadInfos[threadID]) { threadInfos[threadID] = { id: threadID, type: threadsRow.type, name: threadsRow.name ? threadsRow.name : '', description: threadsRow.description ? threadsRow.description : '', color: threadsRow.color, creationTime: threadsRow.creation_time, parentThreadID: threadsRow.parent_thread_id ? threadsRow.parent_thread_id.toString() : null, containingThreadID: threadsRow.containing_thread_id ? threadsRow.containing_thread_id.toString() : null, depth: threadsRow.depth, community: threadsRow.community ? threadsRow.community.toString() : null, members: [], roles: {}, repliesCount: threadsRow.replies_count, pinnedCount: threadsRow.pinned_count, }; if (threadsRow.avatar) { const avatar: AvatarDBContent = JSON.parse(threadsRow.avatar); let clientAvatar: ?ClientAvatar; if ( avatar && avatar.type !== 'image' && avatar.type !== 'encrypted_image' ) { clientAvatar = avatar; } else if ( avatar && (avatar.type === 'image' || avatar.type === 'encrypted_image') && threadsRow.upload_id && threadsRow.upload_secret ) { const uploadID = threadsRow.upload_id.toString(); invariant( uploadID === avatar.uploadID, `uploadID of upload should match uploadID of image avatar`, ); if (avatar.type === 'encrypted_image' && threadsRow.upload_extra) { const uploadExtra = JSON.parse(threadsRow.upload_extra); clientAvatar = { type: 'encrypted_image', blobURI: makeUploadURI( uploadExtra.blobHash, uploadID, threadsRow.upload_secret, ), encryptionKey: uploadExtra.encryptionKey, thumbHash: uploadExtra.thumbHash, }; } else { clientAvatar = { type: 'image', uri: getUploadURL(uploadID, threadsRow.upload_secret), }; } } threadInfos[threadID] = { ...threadInfos[threadID], avatar: clientAvatar, }; } } const sourceMessageID = threadsRow.source_message?.toString(); if (sourceMessageID) { threadInfos[threadID].sourceMessageID = sourceMessageID; } if (threadsRow.user) { const userID = threadsRow.user.toString(); const allPermissions = getAllThreadPermissions( JSON.parse(threadsRow.permissions), threadID, ); threadInfos[threadID].members.push({ id: userID, permissions: allPermissions, role: threadsRow.role ? threadsRow.role.toString() : null, subscription: JSON.parse(threadsRow.subscription), unread: threadsRow.role ? !!threadsRow.unread : null, isSender: !!threadsRow.sender, }); } } for (const rolesRow of rolesResult) { const threadID = rolesRow.id.toString(); if (!rolesRow.role) { continue; } const role = rolesRow.role.toString(); if (!threadInfos[threadID].roles[role]) { threadInfos[threadID].roles[role] = { id: role, name: rolesRow.name, permissions: JSON.parse(rolesRow.permissions), isDefault: Boolean(rolesRow.is_default), }; } } return { threadInfos }; } -type FetchThreadInfosResult = { +export type FetchThreadInfosResult = { +threadInfos: RawThreadInfos, }; async function fetchThreadInfos( viewer: Viewer, inputFilter?: FetchThreadInfosFilter, ): Promise { const filter = { accessibleToUserID: viewer.id, ...inputFilter, }; const serverResult = await fetchServerThreadInfos(filter); return rawThreadInfosFromServerThreadInfos(viewer, serverResult); } function rawThreadInfosFromServerThreadInfos( viewer: Viewer, serverResult: FetchServerThreadInfosResult, ): FetchThreadInfosResult { const viewerID = viewer.id; const codeVersionBelow209 = !hasMinCodeVersion(viewer.platformDetails, { native: 209, }); const codeVersionBelow213 = !hasMinCodeVersion(viewer.platformDetails, { native: 213, }); const codeVersionBelow221 = !hasMinCodeVersion(viewer.platformDetails, { native: 221, }); const codeVersionBelow283 = !hasMinCodeVersion(viewer.platformDetails, { native: 285, }); const threadInfos: { [string]: RawThreadInfo } = {}; for (const threadID in serverResult.threadInfos) { const serverThreadInfo = serverResult.threadInfos[threadID]; const threadInfo = rawThreadInfoFromServerThreadInfo( serverThreadInfo, viewerID, { filterThreadEditAvatarPermission: codeVersionBelow213, excludePinInfo: codeVersionBelow209, filterManageInviteLinksPermission: codeVersionBelow221, filterVoicedInAnnouncementChannelsPermission: codeVersionBelow283, }, ); if (threadInfo) { threadInfos[threadID] = threadInfo; } } return { threadInfos }; } async function verifyThreadIDs( threadIDs: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { if (threadIDs.length === 0) { return []; } const query = SQL`SELECT id FROM threads WHERE id IN (${threadIDs})`; const [result] = await dbQuery(query); const verified = []; for (const row of result) { verified.push(row.id.toString()); } return verified; } async function verifyThreadID(threadID: string): Promise { const result = await verifyThreadIDs([threadID]); return result.length !== 0; } type ThreadAncestry = { +containingThreadID: ?string, +community: ?string, +depth: number, }; async function determineThreadAncestry( parentThreadID: ?string, threadType: ThreadType, ): Promise { if (!parentThreadID) { return { containingThreadID: null, community: null, depth: 0 }; } const parentThreadInfos = await fetchServerThreadInfos({ threadID: parentThreadID, }); const parentThreadInfo = parentThreadInfos.threadInfos[parentThreadID]; if (!parentThreadInfo) { throw new ServerError('invalid_parameters'); } const containingThreadID = getContainingThreadID( parentThreadInfo, threadType, ); const community = getCommunity(parentThreadInfo); const depth = parentThreadInfo.depth + 1; return { containingThreadID, community, depth }; } function personalThreadQuery( firstMemberID: string, secondMemberID: string, ): SQLStatementType { return SQL` SELECT t.id FROM threads t INNER JOIN memberships m1 ON m1.thread = t.id AND m1.user = ${firstMemberID} INNER JOIN memberships m2 ON m2.thread = t.id AND m2.user = ${secondMemberID} WHERE t.type = ${threadTypes.PERSONAL} AND m1.role > 0 AND m2.role > 0 `; } async function fetchPersonalThreadID( viewerID: string, otherMemberID: string, ): Promise { const query = personalThreadQuery(viewerID, otherMemberID); const [threads] = await dbQuery(query); return threads[0]?.id.toString(); } async function serverThreadInfoFromMessageInfo( message: RawMessageInfo | MessageInfo, ): Promise { const threadID = message.threadID; const threads = await fetchServerThreadInfos({ threadID }); return threads.threadInfos[threadID]; } async function fetchContainedThreadIDs( parentThreadID: string, ): Promise> { const query = SQL` WITH RECURSIVE thread_tree AS ( SELECT id, containing_thread_id FROM threads WHERE id = ${parentThreadID} UNION ALL SELECT t.id, t.containing_thread_id FROM threads t JOIN thread_tree tt ON t.containing_thread_id = tt.id ) SELECT id FROM thread_tree `; const [result] = await dbQuery(query); return result.map(row => row.id.toString()); } export { fetchServerThreadInfos, fetchThreadInfos, rawThreadInfosFromServerThreadInfos, verifyThreadIDs, verifyThreadID, determineThreadAncestry, personalThreadQuery, fetchPersonalThreadID, serverThreadInfoFromMessageInfo, fetchContainedThreadIDs, }; diff --git a/keyserver/src/push/rescind.js b/keyserver/src/push/rescind.js index 85f3b23a2..d3edb89a2 100644 --- a/keyserver/src/push/rescind.js +++ b/keyserver/src/push/rescind.js @@ -1,369 +1,379 @@ // @flow import apn from '@parse/node-apn'; import type { ResponseFailure } from '@parse/node-apn'; import type { FirebaseError } from 'firebase-admin'; import invariant from 'invariant'; import type { PlatformDetails } from 'lib/types/device-types.js'; import { threadSubscriptions } from 'lib/types/subscription-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { promiseAll } from 'lib/utils/promises.js'; import { tID } from 'lib/utils/validation-utils.js'; import { prepareEncryptedAndroidNotificationRescinds, prepareEncryptedIOSNotificationRescind, } from './crypto.js'; import { getAPNsNotificationTopic } from './providers.js'; import type { NotificationTargetDevice, TargetedAndroidNotification, TargetedAPNsNotification, } from './types.js'; import { apnPush, fcmPush, type APNPushResult, type FCMPushResult, } from './utils.js'; import createIDs from '../creators/id-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import type { SQLStatementType } from '../database/types.js'; import { validateOutput } from '../utils/validation-utils.js'; type ParsedDelivery = { +platform: 'ios' | 'macos' | 'android', +codeVersion: ?number, +stateVersion: ?number, +notificationID: string, +deviceTokens: $ReadOnlyArray, }; type RescindDelivery = { source: 'rescind', rescindedID: string, errors?: | $ReadOnlyArray | $ReadOnlyArray, }; async function rescindPushNotifs( notifCondition: SQLStatementType, inputCountCondition?: SQLStatementType, ) { const notificationExtractString = `$.${threadSubscriptions.home}`; const visPermissionExtractString = `$.${threadPermissions.VISIBLE}.value`; const fetchQuery = SQL` SELECT n.id, n.user, n.thread, n.message, n.delivery, n.collapse_key, COUNT( `; fetchQuery.append(inputCountCondition ? inputCountCondition : SQL`m.thread`); fetchQuery.append(SQL` ) AS unread_count FROM notifications n LEFT JOIN memberships m ON m.user = n.user AND m.last_message > m.last_read_message AND m.role > 0 AND JSON_EXTRACT(subscription, ${notificationExtractString}) AND JSON_EXTRACT(permissions, ${visPermissionExtractString}) WHERE n.rescinded = 0 AND `); fetchQuery.append(notifCondition); fetchQuery.append(SQL` GROUP BY n.id, m.user`); const [fetchResult] = await dbQuery(fetchQuery); const allDeviceTokens = new Set(); const parsedDeliveries: { [string]: $ReadOnlyArray } = {}; for (const row of fetchResult) { const rawDelivery = JSON.parse(row.delivery); const deliveries = Array.isArray(rawDelivery) ? rawDelivery : [rawDelivery]; const id = row.id.toString(); const rowParsedDeliveries = []; for (const delivery of deliveries) { if ( delivery.iosID || delivery.deviceType === 'ios' || delivery.deviceType === 'macos' ) { const deviceTokens = delivery.iosDeviceTokens ?? delivery.deviceTokens; rowParsedDeliveries.push({ notificationID: delivery.iosID, codeVersion: delivery.codeVersion, stateVersion: delivery.stateVersion, platform: delivery.deviceType ?? 'ios', deviceTokens, }); deviceTokens.forEach(deviceToken => allDeviceTokens.add(deviceToken)); } else if (delivery.androidID || delivery.deviceType === 'android') { const deviceTokens = delivery.androidDeviceTokens ?? delivery.deviceTokens; rowParsedDeliveries.push({ notificationID: row.collapse_key ? row.collapse_key : id, codeVersion: delivery.codeVersion, stateVersion: delivery.stateVersion, platform: 'android', deviceTokens, }); deviceTokens.forEach(deviceToken => allDeviceTokens.add(deviceToken)); } } parsedDeliveries[id] = rowParsedDeliveries; } const deviceTokenToCookieID = await getDeviceTokenToCookieID(allDeviceTokens); const deliveryPromises: { [string]: Promise | Promise, } = {}; const notifInfo = {}; const rescindedIDs = []; for (const row of fetchResult) { const id = row.id.toString(); const threadID = row.thread.toString(); notifInfo[id] = { userID: row.user.toString(), threadID, messageID: row.message.toString(), }; for (const delivery of parsedDeliveries[id]) { let platformDetails: PlatformDetails = { platform: delivery.platform }; if (delivery.codeVersion) { platformDetails = { ...platformDetails, codeVersion: delivery.codeVersion, }; } if (delivery.stateVersion) { platformDetails = { ...platformDetails, stateVersion: delivery.stateVersion, }; } if (delivery.platform === 'ios') { const devices = delivery.deviceTokens.map(deviceToken => ({ deviceToken, cookieID: deviceTokenToCookieID[deviceToken], })); const deliveryPromise = (async () => { const targetedNotifications = await prepareIOSNotification( delivery.notificationID, row.unread_count, threadID, platformDetails, devices, ); return await apnPush({ targetedNotifications, platformDetails: { platform: 'ios', codeVersion: delivery.codeVersion, }, }); })(); deliveryPromises[id] = deliveryPromise; } else if (delivery.platform === 'android') { const devices = delivery.deviceTokens.map(deviceToken => ({ deviceToken, cookieID: deviceTokenToCookieID[deviceToken], })); const deliveryPromise = (async () => { const targetedNotifications = await prepareAndroidNotification( delivery.notificationID, row.unread_count, threadID, platformDetails, devices, ); return await fcmPush({ targetedNotifications, codeVersion: delivery.codeVersion, }); })(); deliveryPromises[id] = deliveryPromise; } } rescindedIDs.push(id); } const numRescinds = Object.keys(deliveryPromises).length; - const promises = [promiseAll(deliveryPromises)]; - if (numRescinds > 0) { - promises.push(createIDs('notifications', numRescinds)); - } - if (rescindedIDs.length > 0) { + const dbIDsPromise: Promise> = (async () => { + if (numRescinds === 0) { + return undefined; + } + return await createIDs('notifications', numRescinds); + })(); + const rescindPromise: Promise = (async () => { + if (rescindedIDs.length === 0) { + return undefined; + } const rescindQuery = SQL` UPDATE notifications SET rescinded = 1 WHERE id IN (${rescindedIDs}) `; - promises.push(dbQuery(rescindQuery)); - } + return await dbQuery(rescindQuery); + })(); + + const [deliveryResults, dbIDs] = await Promise.all([ + promiseAll(deliveryPromises), + dbIDsPromise, + rescindPromise, + ]); - const [deliveryResults, dbIDs] = await Promise.all(promises); const newNotifRows = []; if (numRescinds > 0) { invariant(dbIDs, 'dbIDs should be set'); for (const rescindedID in deliveryResults) { const delivery: RescindDelivery = { source: 'rescind', rescindedID, }; const { errors } = deliveryResults[rescindedID]; if (errors) { delivery.errors = errors; } const dbID = dbIDs.shift(); const { userID, threadID, messageID } = notifInfo[rescindedID]; newNotifRows.push([ dbID, userID, threadID, messageID, null, JSON.stringify([delivery]), 1, ]); } } if (newNotifRows.length > 0) { const insertQuery = SQL` INSERT INTO notifications (id, user, thread, message, collapse_key, delivery, rescinded) VALUES ${newNotifRows} `; await dbQuery(insertQuery); } } async function getDeviceTokenToCookieID( deviceTokens: Set, ): Promise<{ +[string]: string }> { if (deviceTokens.size === 0) { return {}; } const deviceTokenToCookieID = {}; const fetchCookiesQuery = SQL` SELECT id, device_token FROM cookies WHERE device_token IN (${[...deviceTokens]}) `; const [fetchResult] = await dbQuery(fetchCookiesQuery); for (const row of fetchResult) { deviceTokenToCookieID[row.device_token.toString()] = row.id.toString(); } return deviceTokenToCookieID; } async function conditionallyEncryptNotification( notification: T, codeVersion: ?number, devices: $ReadOnlyArray, encryptCallback: ( devices: $ReadOnlyArray, notification: T, codeVersion?: ?number, ) => Promise< $ReadOnlyArray<{ +notification: T, +cookieID: string, +deviceToken: string, +encryptionOrder?: number, }>, >, ): Promise<$ReadOnlyArray<{ +deviceToken: string, +notification: T }>> { const shouldBeEncrypted = codeVersion && codeVersion >= 233; if (!shouldBeEncrypted) { return devices.map(({ deviceToken }) => ({ notification, deviceToken, })); } const notifications = await encryptCallback( devices, notification, codeVersion, ); return notifications.map(({ deviceToken, notification: notif }) => ({ deviceToken, notification: notif, })); } async function prepareIOSNotification( iosID: string, unreadCount: number, threadID: string, platformDetails: PlatformDetails, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { threadID = validateOutput(platformDetails, tID, threadID); const { codeVersion } = platformDetails; const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'ios', codeVersion, }); if (codeVersion && codeVersion > 198) { notification.mutableContent = true; notification.pushType = 'alert'; notification.badge = unreadCount; } else { notification.priority = 5; notification.contentAvailable = true; notification.pushType = 'background'; } notification.payload = codeVersion && codeVersion > 135 ? { backgroundNotifType: 'CLEAR', notificationId: iosID, setUnreadStatus: true, threadID, } : { managedAps: { action: 'CLEAR', notificationId: iosID, }, }; return await conditionallyEncryptNotification( notification, codeVersion, devices, prepareEncryptedIOSNotificationRescind, ); } async function prepareAndroidNotification( notifID: string, unreadCount: number, threadID: string, platformDetails: PlatformDetails, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { threadID = validateOutput(platformDetails, tID, threadID); const { codeVersion } = platformDetails; const notification = { data: { badge: unreadCount.toString(), rescind: 'true', rescindID: notifID, setUnreadStatus: 'true', threadID, }, }; return await conditionallyEncryptNotification( notification, codeVersion, devices, prepareEncryptedAndroidNotificationRescinds, ); } export { rescindPushNotifs }; diff --git a/keyserver/src/push/send.js b/keyserver/src/push/send.js index c811761fd..31c5460fd 100644 --- a/keyserver/src/push/send.js +++ b/keyserver/src/push/send.js @@ -1,1727 +1,1733 @@ // @flow import apn from '@parse/node-apn'; import type { ResponseFailure } from '@parse/node-apn'; import invariant from 'invariant'; import _cloneDeep from 'lodash/fp/cloneDeep.js'; import _flow from 'lodash/fp/flow.js'; import _groupBy from 'lodash/fp/groupBy.js'; import _mapValues from 'lodash/fp/mapValues.js'; import _pickBy from 'lodash/fp/pickBy.js'; +import type { QueryResults } from 'mysql'; import t from 'tcomb'; import uuidv4 from 'uuid/v4.js'; import { oldValidUsernameRegex } from 'lib/shared/account-utils.js'; import { isUserMentioned } from 'lib/shared/mention-utils.js'; import { createMessageInfo, sortMessageInfoList, shimUnsupportedRawMessageInfos, } from 'lib/shared/message-utils.js'; import { messageSpecs } from 'lib/shared/messages/message-specs.js'; import { notifTextsForMessageInfo } from 'lib/shared/notif-utils.js'; import { isStaff } from 'lib/shared/staff-utils.js'; import { rawThreadInfoFromServerThreadInfo, threadInfoFromRawThreadInfo, } from 'lib/shared/thread-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { Platform, PlatformDetails } from 'lib/types/device-types.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import { type RawMessageInfo, type MessageData, } from 'lib/types/message-types.js'; import { rawMessageInfoValidator } from 'lib/types/message-types.js'; import type { WNSNotification, ResolvedNotifTexts, } from 'lib/types/notif-types.js'; import { resolvedNotifTextsValidator } from 'lib/types/notif-types.js'; import type { ServerThreadInfo, ThreadInfo } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { type GlobalUserInfo } from 'lib/types/user-types.js'; import { isDev } from 'lib/utils/dev-utils.js'; import { values } from 'lib/utils/objects.js'; -import { promiseAll } from 'lib/utils/promises.js'; import { tID, tPlatformDetails, tShape } from 'lib/utils/validation-utils.js'; import { prepareEncryptedIOSNotifications, prepareEncryptedAndroidNotifications, prepareEncryptedWebNotifications, } from './crypto.js'; import { getAPNsNotificationTopic } from './providers.js'; import { rescindPushNotifs } from './rescind.js'; import type { NotificationTargetDevice, TargetedAPNsNotification, TargetedAndroidNotification, TargetedWebNotification, TargetedWNSNotification, } from './types.js'; import { apnPush, fcmPush, getUnreadCounts, apnMaxNotificationPayloadByteSize, fcmMaxNotificationPayloadByteSize, wnsMaxNotificationPayloadByteSize, webPush, wnsPush, type WebPushError, type WNSPushError, } from './utils.js'; import createIDs from '../creators/id-creator.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL, mergeOrConditions } from '../database/database.js'; import type { CollapsableNotifInfo } from '../fetchers/message-fetchers.js'; import { fetchCollapsableNotifs } from '../fetchers/message-fetchers.js'; import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchUserInfos } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { getENSNames } from '../utils/ens-cache.js'; import { validateOutput } from '../utils/validation-utils.js'; export type Device = { +platform: Platform, +deviceToken: string, +cookieID: string, +codeVersion: ?number, +stateVersion: ?number, }; export type PushUserInfo = { +devices: Device[], // messageInfos and messageDatas have the same key +messageInfos: RawMessageInfo[], +messageDatas: MessageData[], }; type Delivery = PushDelivery | { collapsedInto: string }; type NotificationRow = { +dbID: string, +userID: string, +threadID?: ?string, +messageID?: ?string, +collapseKey?: ?string, +deliveries: Delivery[], }; export type PushInfo = { [userID: string]: PushUserInfo }; async function sendPushNotifs(pushInfo: PushInfo) { if (Object.keys(pushInfo).length === 0) { return; } const [ unreadCounts, { usersToCollapsableNotifInfo, serverThreadInfos, userInfos }, dbIDs, ] = await Promise.all([ getUnreadCounts(Object.keys(pushInfo)), fetchInfos(pushInfo), createDBIDs(pushInfo), ]); const preparePromises: Array>> = []; const notifications: Map = new Map(); for (const userID in usersToCollapsableNotifInfo) { const threadInfos = _flow( _mapValues((serverThreadInfo: ServerThreadInfo) => { const rawThreadInfo = rawThreadInfoFromServerThreadInfo( serverThreadInfo, userID, ); if (!rawThreadInfo) { return null; } return threadInfoFromRawThreadInfo(rawThreadInfo, userID, userInfos); }), _pickBy(threadInfo => threadInfo), )(serverThreadInfos); for (const notifInfo of usersToCollapsableNotifInfo[userID]) { preparePromises.push( preparePushNotif({ notifInfo, userID, pushUserInfo: pushInfo[userID], unreadCount: unreadCounts[userID], threadInfos, userInfos, dbIDs, rowsToSave: notifications, }), ); } } const prepareResults = await Promise.all(preparePromises); const flattenedPrepareResults = prepareResults.filter(Boolean).flat(); const deliveryResults = await deliverPushNotifsInEncryptionOrder( flattenedPrepareResults, ); const cleanUpPromise = (async () => { if (dbIDs.length === 0) { return; } const query = SQL`DELETE FROM ids WHERE id IN (${dbIDs})`; await dbQuery(query); })(); await Promise.all([ cleanUpPromise, saveNotifResults(deliveryResults, notifications, true), ]); } type PreparePushResult = { +platform: Platform, +notificationInfo: NotificationInfo, +notification: | TargetedAPNsNotification | TargetedAndroidNotification | TargetedWebNotification | TargetedWNSNotification, }; async function preparePushNotif(input: { notifInfo: CollapsableNotifInfo, userID: string, pushUserInfo: PushUserInfo, unreadCount: number, threadInfos: { +[threadID: string]: ThreadInfo }, userInfos: { +[userID: string]: GlobalUserInfo }, dbIDs: string[], // mutable rowsToSave: Map, // mutable }): Promise> { const { notifInfo, userID, pushUserInfo, unreadCount, threadInfos, userInfos, dbIDs, rowsToSave, } = input; const hydrateMessageInfo = (rawMessageInfo: RawMessageInfo) => createMessageInfo(rawMessageInfo, userID, userInfos, threadInfos); const newMessageInfos = []; const newRawMessageInfos = []; for (const newRawMessageInfo of notifInfo.newMessageInfos) { const newMessageInfo = hydrateMessageInfo(newRawMessageInfo); if (newMessageInfo) { newMessageInfos.push(newMessageInfo); newRawMessageInfos.push(newRawMessageInfo); } } if (newMessageInfos.length === 0) { return null; } const existingMessageInfos = notifInfo.existingMessageInfos .map(hydrateMessageInfo) .filter(Boolean); const allMessageInfos = sortMessageInfoList([ ...newMessageInfos, ...existingMessageInfos, ]); const [firstNewMessageInfo, ...remainingNewMessageInfos] = newMessageInfos; const { threadID } = firstNewMessageInfo; const threadInfo = threadInfos[threadID]; const parentThreadInfo = threadInfo.parentThreadID ? threadInfos[threadInfo.parentThreadID] : null; const updateBadge = threadInfo.currentUser.subscription.home; const displayBanner = threadInfo.currentUser.subscription.pushNotifs; const username = userInfos[userID] && userInfos[userID].username; const userWasMentioned = username && threadInfo.currentUser.role && oldValidUsernameRegex.test(username) && newMessageInfos.some(newMessageInfo => { const unwrappedMessageInfo = newMessageInfo.type === messageTypes.SIDEBAR_SOURCE ? newMessageInfo.sourceMessage : newMessageInfo; return ( unwrappedMessageInfo.type === messageTypes.TEXT && isUserMentioned(username, unwrappedMessageInfo.text) ); }); if (!updateBadge && !displayBanner && !userWasMentioned) { return null; } const badgeOnly = !displayBanner && !userWasMentioned; const notifTargetUserInfo = { id: userID, username }; const notifTexts = await notifTextsForMessageInfo( allMessageInfos, threadInfo, parentThreadInfo, notifTargetUserInfo, getENSNames, ); if (!notifTexts) { return null; } const dbID = dbIDs.shift(); invariant(dbID, 'should have sufficient DB IDs'); const byPlatform = getDevicesByPlatform(pushUserInfo.devices); const firstMessageID = firstNewMessageInfo.id; invariant(firstMessageID, 'RawMessageInfo.id should be set on server'); const notificationInfo = { source: 'new_message', dbID, userID, threadID, messageID: firstMessageID, collapseKey: notifInfo.collapseKey, }; const preparePromises: Array>> = []; const iosVersionsToTokens = byPlatform.get('ios'); if (iosVersionsToTokens) { for (const [versionKey, devices] of iosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails: PlatformDetails = { platform: 'ios', codeVersion, stateVersion, }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareAPNsNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount, platformDetails, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'ios', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const androidVersionsToTokens = byPlatform.get('android'); if (androidVersionsToTokens) { for (const [versionKey, devices] of androidVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'android', codeVersion, stateVersion, }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareAndroidNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount, platformDetails, dbID, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'android', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const webVersionsToTokens = byPlatform.get('web'); if (webVersionsToTokens) { for (const [versionKey, devices] of webVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'web', codeVersion, stateVersion, }; const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareWebNotification( userID, { notifTexts, threadID: threadInfo.id, unreadCount, platformDetails, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'web', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const macosVersionsToTokens = byPlatform.get('macos'); if (macosVersionsToTokens) { for (const [versionKey, devices] of macosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'macos', codeVersion, stateVersion, }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareAPNsNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount, platformDetails, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'macos', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const windowsVersionsToTokens = byPlatform.get('windows'); if (windowsVersionsToTokens) { for (const [versionKey, devices] of windowsVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'windows', codeVersion, stateVersion, }; const preparePromise: Promise<$ReadOnlyArray> = (async () => { const notification = await prepareWNSNotification({ notifTexts, threadID: threadInfo.id, unreadCount, platformDetails, }); return devices.map(({ deviceToken }) => ({ notification: ({ deviceToken, notification, }: TargetedWNSNotification), platform: 'windows', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } for (const newMessageInfo of remainingNewMessageInfos) { const newDBID = dbIDs.shift(); invariant(newDBID, 'should have sufficient DB IDs'); const messageID = newMessageInfo.id; invariant(messageID, 'RawMessageInfo.id should be set on server'); rowsToSave.set(newDBID, { dbID: newDBID, userID, threadID: newMessageInfo.threadID, messageID, collapseKey: notifInfo.collapseKey, deliveries: [{ collapsedInto: dbID }], }); } const prepareResults = await Promise.all(preparePromises); return prepareResults.flat(); } // For better readability we don't differentiate between // encrypted and unencrypted notifs and order them together function compareEncryptionOrder( pushNotif1: PreparePushResult, pushNotif2: PreparePushResult, ): number { const order1 = pushNotif1.notification.encryptionOrder ?? 0; const order2 = pushNotif2.notification.encryptionOrder ?? 0; return order1 - order2; } async function deliverPushNotifsInEncryptionOrder( preparedPushNotifs: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const deliveryPromises: Array>> = []; const groupedByDevice = _groupBy( preparedPushNotif => preparedPushNotif.deviceToken, )(preparedPushNotifs); for (const preparedPushNotifsForDevice of values(groupedByDevice)) { const orderedPushNotifsForDevice = preparedPushNotifsForDevice.sort( compareEncryptionOrder, ); const deviceDeliveryPromise = (async () => { const deliveries = []; for (const preparedPushNotif of orderedPushNotifsForDevice) { const { platform, notification, notificationInfo } = preparedPushNotif; let delivery: PushResult; if (platform === 'ios' || platform === 'macos') { delivery = await sendAPNsNotification( platform, [notification], notificationInfo, ); } else if (platform === 'android') { delivery = await sendAndroidNotification( [notification], notificationInfo, ); } else if (platform === 'web') { delivery = await sendWebNotifications( [notification], notificationInfo, ); } else if (platform === 'windows') { delivery = await sendWNSNotification( [notification], notificationInfo, ); } if (delivery) { deliveries.push(delivery); } } return deliveries; })(); deliveryPromises.push(deviceDeliveryPromise); } const deliveryResults = await Promise.all(deliveryPromises); return deliveryResults.flat(); } async function sendRescindNotifs(rescindInfo: PushInfo) { if (Object.keys(rescindInfo).length === 0) { return; } const usersToCollapsableNotifInfo = await fetchCollapsableNotifs(rescindInfo); const promises = []; for (const userID in usersToCollapsableNotifInfo) { for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const existingMessageInfo of notifInfo.existingMessageInfos) { const rescindCondition = SQL` n.user = ${userID} AND n.thread = ${existingMessageInfo.threadID} AND n.message = ${existingMessageInfo.id} `; promises.push(rescindPushNotifs(rescindCondition)); } } } await Promise.all(promises); } // The results in deliveryResults will be combined with the rows // in rowsToSave and then written to the notifications table async function saveNotifResults( deliveryResults: $ReadOnlyArray, inputRowsToSave: Map, rescindable: boolean, ) { const rowsToSave = new Map(inputRowsToSave); const allInvalidTokens = []; for (const deliveryResult of deliveryResults) { const { info, delivery, invalidTokens } = deliveryResult; const { dbID, userID } = info; const curNotifRow = rowsToSave.get(dbID); if (curNotifRow) { curNotifRow.deliveries.push(delivery); } else { // Ternary expressions for Flow const threadID = info.threadID ? info.threadID : null; const messageID = info.messageID ? info.messageID : null; const collapseKey = info.collapseKey ? info.collapseKey : null; rowsToSave.set(dbID, { dbID, userID, threadID, messageID, collapseKey, deliveries: [delivery], }); } if (invalidTokens) { allInvalidTokens.push({ userID, tokens: invalidTokens, }); } } const notificationRows = []; for (const notification of rowsToSave.values()) { notificationRows.push([ notification.dbID, notification.userID, notification.threadID, notification.messageID, notification.collapseKey, JSON.stringify(notification.deliveries), Number(!rescindable), ]); } const dbPromises: Array> = []; if (allInvalidTokens.length > 0) { dbPromises.push(removeInvalidTokens(allInvalidTokens)); } if (notificationRows.length > 0) { const query = SQL` INSERT INTO notifications (id, user, thread, message, collapse_key, delivery, rescinded) VALUES ${notificationRows} `; dbPromises.push(dbQuery(query)); } if (dbPromises.length > 0) { await Promise.all(dbPromises); } } async function fetchInfos(pushInfo: PushInfo) { const usersToCollapsableNotifInfo = await fetchCollapsableNotifs(pushInfo); const threadIDs = new Set(); const threadWithChangedNamesToMessages = new Map(); const addThreadIDsFromMessageInfos = (rawMessageInfo: RawMessageInfo) => { const threadID = rawMessageInfo.threadID; threadIDs.add(threadID); const messageSpec = messageSpecs[rawMessageInfo.type]; if (messageSpec.threadIDs) { for (const id of messageSpec.threadIDs(rawMessageInfo)) { threadIDs.add(id); } } if ( rawMessageInfo.type === messageTypes.CHANGE_SETTINGS && rawMessageInfo.field === 'name' ) { const messages = threadWithChangedNamesToMessages.get(threadID); if (messages) { messages.push(rawMessageInfo.id); } else { threadWithChangedNamesToMessages.set(threadID, [rawMessageInfo.id]); } } }; for (const userID in usersToCollapsableNotifInfo) { for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const rawMessageInfo of notifInfo.existingMessageInfos) { addThreadIDsFromMessageInfos(rawMessageInfo); } for (const rawMessageInfo of notifInfo.newMessageInfos) { addThreadIDsFromMessageInfos(rawMessageInfo); } } } - const promises = {}; // These threadInfos won't have currentUser set - promises.threadResult = fetchServerThreadInfos({ threadIDs }); - if (threadWithChangedNamesToMessages.size > 0) { + const threadPromise = fetchServerThreadInfos({ threadIDs }); + + const oldNamesPromise: Promise = (async () => { + if (threadWithChangedNamesToMessages.size === 0) { + return undefined; + } const typesThatAffectName = [ messageTypes.CHANGE_SETTINGS, messageTypes.CREATE_THREAD, ]; const oldNameQuery = SQL` SELECT IF( JSON_TYPE(JSON_EXTRACT(m.content, "$.name")) = 'NULL', "", JSON_UNQUOTE(JSON_EXTRACT(m.content, "$.name")) ) AS name, m.thread FROM ( SELECT MAX(id) AS id FROM messages WHERE type IN (${typesThatAffectName}) AND JSON_EXTRACT(content, "$.name") IS NOT NULL AND`; const threadClauses = []; for (const [threadID, messages] of threadWithChangedNamesToMessages) { threadClauses.push( SQL`(thread = ${threadID} AND id NOT IN (${messages}))`, ); } oldNameQuery.append(mergeOrConditions(threadClauses)); oldNameQuery.append(SQL` GROUP BY thread ) x LEFT JOIN messages m ON m.id = x.id `); - promises.oldNames = dbQuery(oldNameQuery); - } + return await dbQuery(oldNameQuery); + })(); - const { threadResult, oldNames } = await promiseAll(promises); + const [threadResult, oldNames] = await Promise.all([ + threadPromise, + oldNamesPromise, + ]); const serverThreadInfos = threadResult.threadInfos; if (oldNames) { const [result] = oldNames; for (const row of result) { const threadID = row.thread.toString(); serverThreadInfos[threadID].name = row.name; } } const userInfos = await fetchNotifUserInfos( serverThreadInfos, usersToCollapsableNotifInfo, ); return { usersToCollapsableNotifInfo, serverThreadInfos, userInfos }; } async function fetchNotifUserInfos( serverThreadInfos: { +[threadID: string]: ServerThreadInfo }, usersToCollapsableNotifInfo: { +[userID: string]: CollapsableNotifInfo[] }, ) { const missingUserIDs = new Set(); for (const threadID in serverThreadInfos) { const serverThreadInfo = serverThreadInfos[threadID]; for (const member of serverThreadInfo.members) { missingUserIDs.add(member.id); } } const addUserIDsFromMessageInfos = (rawMessageInfo: RawMessageInfo) => { missingUserIDs.add(rawMessageInfo.creatorID); const userIDs = messageSpecs[rawMessageInfo.type].userIDs?.(rawMessageInfo) ?? []; for (const userID of userIDs) { missingUserIDs.add(userID); } }; for (const userID in usersToCollapsableNotifInfo) { missingUserIDs.add(userID); for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const rawMessageInfo of notifInfo.existingMessageInfos) { addUserIDsFromMessageInfos(rawMessageInfo); } for (const rawMessageInfo of notifInfo.newMessageInfos) { addUserIDsFromMessageInfos(rawMessageInfo); } } } return await fetchUserInfos([...missingUserIDs]); } async function createDBIDs(pushInfo: PushInfo): Promise { let numIDsNeeded = 0; for (const userID in pushInfo) { numIDsNeeded += pushInfo[userID].messageInfos.length; } return await createIDs('notifications', numIDsNeeded); } type VersionKey = { codeVersion: number, stateVersion: number }; const versionKeyRegex: RegExp = new RegExp(/^-?\d+\|-?\d+$/); function versionKeyToString(versionKey: VersionKey): string { return `${versionKey.codeVersion}|${versionKey.stateVersion}`; } function stringToVersionKey(versionKeyString: string): VersionKey { invariant( versionKeyRegex.test(versionKeyString), 'should pass correct version key string', ); const [codeVersion, stateVersion] = versionKeyString.split('|').map(Number); return { codeVersion, stateVersion }; } function getDevicesByPlatform( devices: $ReadOnlyArray, ): Map>> { const byPlatform = new Map(); for (const device of devices) { let innerMap = byPlatform.get(device.platform); if (!innerMap) { innerMap = new Map(); byPlatform.set(device.platform, innerMap); } const codeVersion: number = device.codeVersion !== null && device.codeVersion !== undefined && device.platform !== 'windows' && device.platform !== 'macos' ? device.codeVersion : -1; const stateVersion: number = device.stateVersion ?? -1; const versionKey = versionKeyToString({ codeVersion, stateVersion, }); let innerMostArrayTmp: ?Array = innerMap.get(versionKey); if (!innerMostArrayTmp) { innerMostArrayTmp = []; innerMap.set(versionKey, innerMostArrayTmp); } const innerMostArray = innerMostArrayTmp; innerMostArray.push({ cookieID: device.cookieID, deviceToken: device.deviceToken, }); } return byPlatform; } type APNsNotifInputData = { +notifTexts: ResolvedNotifTexts, +newRawMessageInfos: RawMessageInfo[], +threadID: string, +collapseKey: ?string, +badgeOnly: boolean, +unreadCount: number, +platformDetails: PlatformDetails, }; const apnsNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, newRawMessageInfos: t.list(rawMessageInfoValidator), threadID: tID, collapseKey: t.maybe(t.String), badgeOnly: t.Boolean, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareAPNsNotification( inputData: APNsNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, apnsNotifInputDataValidator, inputData, ); const { notifTexts, newRawMessageInfos, threadID, collapseKey, badgeOnly, unreadCount, platformDetails, } = convertedData; const canDecryptNonCollapsibleTextNotifs = platformDetails.codeVersion && platformDetails.codeVersion > 222; const isNonCollapsibleTextNotification = newRawMessageInfos.every( newRawMessageInfo => newRawMessageInfo.type === messageTypes.TEXT, ) && !collapseKey; const canDecryptAllNotifTypes = platformDetails.codeVersion && platformDetails.codeVersion >= 267; const shouldBeEncrypted = platformDetails.platform === 'ios' && (canDecryptAllNotifTypes || (isNonCollapsibleTextNotification && canDecryptNonCollapsibleTextNotifs)); const uniqueID = uuidv4(); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic(platformDetails); const { merged, ...rest } = notifTexts; // We don't include alert's body on macos because we // handle displaying the notification ourselves and // we don't want macOS to display it automatically. if (!badgeOnly && platformDetails.platform !== 'macos') { notification.body = merged; notification.sound = 'default'; } notification.payload = { ...notification.payload, ...rest, }; notification.badge = unreadCount; notification.threadId = threadID; notification.id = uniqueID; notification.pushType = 'alert'; notification.payload.id = uniqueID; notification.payload.threadID = threadID; if (platformDetails.codeVersion && platformDetails.codeVersion > 198) { notification.mutableContent = true; } if (collapseKey && canDecryptAllNotifTypes) { notification.payload.collapseID = collapseKey; } else if (collapseKey) { notification.collapseId = collapseKey; } const messageInfos = JSON.stringify(newRawMessageInfos); // We make a copy before checking notification's length, because calling // length compiles the notification and makes it immutable. Further // changes to its properties won't be reflected in the final plaintext // data that is sent. const copyWithMessageInfos = _cloneDeep(notification); copyWithMessageInfos.payload = { ...copyWithMessageInfos.payload, messageInfos, }; const notificationSizeValidator = notif => notif.length() <= apnMaxNotificationPayloadByteSize; if (!shouldBeEncrypted) { const notificationToSend = notificationSizeValidator( _cloneDeep(copyWithMessageInfos), ) ? copyWithMessageInfos : notification; return devices.map(({ deviceToken }) => ({ notification: notificationToSend, deviceToken, })); } const notifsWithMessageInfos = await prepareEncryptedIOSNotifications( devices, copyWithMessageInfos, platformDetails.codeVersion, notificationSizeValidator, ); const devicesWithExcessiveSize = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => payloadSizeExceeded) .map(({ deviceToken, cookieID }) => ({ deviceToken, cookieID })); if (devicesWithExcessiveSize.length === 0) { return notifsWithMessageInfos.map( ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }) => ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }), ); } const notifsWithoutMessageInfos = await prepareEncryptedIOSNotifications( devicesWithExcessiveSize, notification, platformDetails.codeVersion, ); const targetedNotifsWithMessageInfos = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => !payloadSizeExceeded) .map( ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }) => ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }), ); const targetedNotifsWithoutMessageInfos = notifsWithoutMessageInfos.map( ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }) => ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }), ); return [ ...targetedNotifsWithMessageInfos, ...targetedNotifsWithoutMessageInfos, ]; } type AndroidNotifInputData = { ...APNsNotifInputData, +dbID: string, }; const androidNotifInputDataValidator = tShape({ ...apnsNotifInputDataValidator.meta.props, dbID: t.String, }); async function prepareAndroidNotification( inputData: AndroidNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, androidNotifInputDataValidator, inputData, ); const { notifTexts, newRawMessageInfos, threadID, collapseKey, badgeOnly, unreadCount, platformDetails: { codeVersion }, dbID, } = convertedData; const canDecryptNonCollapsibleTextNotifs = codeVersion && codeVersion > 228; const isNonCollapsibleTextNotif = newRawMessageInfos.every( newRawMessageInfo => newRawMessageInfo.type === messageTypes.TEXT, ) && !collapseKey; const canDecryptAllNotifTypes = codeVersion && codeVersion >= 267; const shouldBeEncrypted = canDecryptAllNotifTypes || (canDecryptNonCollapsibleTextNotifs && isNonCollapsibleTextNotif); const { merged, ...rest } = notifTexts; const notification = { data: { badge: unreadCount.toString(), ...rest, threadID, }, }; let notifID; if (collapseKey && canDecryptAllNotifTypes) { notifID = dbID; notification.data = { ...notification.data, collapseKey, }; } else if (collapseKey) { notifID = collapseKey; } else { notifID = dbID; } // The reason we only include `badgeOnly` for newer clients is because older // clients don't know how to parse it. The reason we only include `id` for // newer clients is that if the older clients see that field, they assume // the notif has a full payload, and then crash when trying to parse it. // By skipping `id` we allow old clients to still handle in-app notifs and // badge updating. if (!badgeOnly || (codeVersion && codeVersion >= 69)) { notification.data = { ...notification.data, id: notifID, badgeOnly: badgeOnly ? '1' : '0', }; } const messageInfos = JSON.stringify(newRawMessageInfos); const copyWithMessageInfos = { ...notification, data: { ...notification.data, messageInfos }, }; if (!shouldBeEncrypted) { const notificationToSend = Buffer.byteLength(JSON.stringify(copyWithMessageInfos)) <= fcmMaxNotificationPayloadByteSize ? copyWithMessageInfos : notification; return devices.map(({ deviceToken }) => ({ notification: notificationToSend, deviceToken, })); } const notificationsSizeValidator = notif => { const serializedNotif = JSON.stringify(notif); return ( !serializedNotif || Buffer.byteLength(serializedNotif) <= fcmMaxNotificationPayloadByteSize ); }; const notifsWithMessageInfos = await prepareEncryptedAndroidNotifications( devices, copyWithMessageInfos, notificationsSizeValidator, ); const devicesWithExcessiveSize = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => payloadSizeExceeded) .map(({ cookieID, deviceToken }) => ({ cookieID, deviceToken })); if (devicesWithExcessiveSize.length === 0) { return notifsWithMessageInfos.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); } const notifsWithoutMessageInfos = await prepareEncryptedAndroidNotifications( devicesWithExcessiveSize, notification, ); const targetedNotifsWithMessageInfos = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => !payloadSizeExceeded) .map(({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, })); const targetedNotifsWithoutMessageInfos = notifsWithoutMessageInfos.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); return [ ...targetedNotifsWithMessageInfos, ...targetedNotifsWithoutMessageInfos, ]; } type WebNotifInputData = { +notifTexts: ResolvedNotifTexts, +threadID: string, +unreadCount: number, +platformDetails: PlatformDetails, }; const webNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, threadID: tID, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareWebNotification( userID: string, inputData: WebNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, webNotifInputDataValidator, inputData, ); const { notifTexts, threadID, unreadCount } = convertedData; const id = uuidv4(); const { merged, ...rest } = notifTexts; const notification = { ...rest, unreadCount, id, threadID, }; const isStaffOrDev = isStaff(userID) || isDev; const shouldBeEncrypted = hasMinCodeVersion(convertedData.platformDetails, { web: 43, }) && isStaffOrDev; if (!shouldBeEncrypted) { return devices.map(({ deviceToken }) => ({ deviceToken, notification })); } return prepareEncryptedWebNotifications(devices, notification); } type WNSNotifInputData = { +notifTexts: ResolvedNotifTexts, +threadID: string, +unreadCount: number, +platformDetails: PlatformDetails, }; const wnsNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, threadID: tID, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareWNSNotification( inputData: WNSNotifInputData, ): Promise { const convertedData = validateOutput( inputData.platformDetails, wnsNotifInputDataValidator, inputData, ); const { notifTexts, threadID, unreadCount } = convertedData; const { merged, ...rest } = notifTexts; const notification = { ...rest, unreadCount, threadID, }; if ( Buffer.byteLength(JSON.stringify(notification)) > wnsMaxNotificationPayloadByteSize ) { console.warn('WNS notification exceeds size limit'); } return notification; } type NotificationInfo = | { +source: 'new_message', +dbID: string, +userID: string, +threadID: string, +messageID: string, +collapseKey: ?string, +codeVersion: number, +stateVersion: number, } | { +source: 'mark_as_unread' | 'mark_as_read' | 'activity_update', +dbID: string, +userID: string, +codeVersion: number, +stateVersion: number, }; type APNsDelivery = { +source: $PropertyType, +deviceType: 'ios' | 'macos', +iosID: string, +deviceTokens: $ReadOnlyArray, +codeVersion: number, +stateVersion: number, +errors?: $ReadOnlyArray, +encryptedPayloadHashes?: $ReadOnlyArray, +deviceTokensToPayloadHash?: { +[deviceToken: string]: string, }, }; type APNsResult = { info: NotificationInfo, delivery: APNsDelivery, invalidTokens?: $ReadOnlyArray, }; async function sendAPNsNotification( platform: 'ios' | 'macos', targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion, stateVersion } = notificationInfo; const response = await apnPush({ targetedNotifications, platformDetails: { platform, codeVersion }, }); invariant( new Set(targetedNotifications.map(({ notification }) => notification.id)) .size === 1, 'Encrypted versions of the same notification must share id value', ); const iosID = targetedNotifications[0].notification.id; const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); let delivery: APNsDelivery = { source, deviceType: platform, iosID, deviceTokens, codeVersion, stateVersion, }; if (response.errors) { delivery = { ...delivery, errors: response.errors, }; } const deviceTokensToPayloadHash: { [string]: string } = {}; for (const targetedNotification of targetedNotifications) { if (targetedNotification.encryptedPayloadHash) { deviceTokensToPayloadHash[targetedNotification.deviceToken] = targetedNotification.encryptedPayloadHash; } } if (Object.keys(deviceTokensToPayloadHash).length !== 0) { delivery = { ...delivery, deviceTokensToPayloadHash, }; } const result: APNsResult = { info: notificationInfo, delivery, }; if (response.invalidTokens) { result.invalidTokens = response.invalidTokens; } return result; } type PushResult = AndroidResult | APNsResult | WebResult | WNSResult; type PushDelivery = AndroidDelivery | APNsDelivery | WebDelivery | WNSDelivery; type AndroidDelivery = { source: $PropertyType, deviceType: 'android', androidIDs: $ReadOnlyArray, deviceTokens: $ReadOnlyArray, codeVersion: number, stateVersion: number, errors?: $ReadOnlyArray, }; type AndroidResult = { info: NotificationInfo, delivery: AndroidDelivery, invalidTokens?: $ReadOnlyArray, }; async function sendAndroidNotification( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const collapseKey = notificationInfo.collapseKey ? notificationInfo.collapseKey : null; // for Flow... const { source, codeVersion, stateVersion } = notificationInfo; const response = await fcmPush({ targetedNotifications, collapseKey, codeVersion, }); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const androidIDs = response.fcmIDs ? response.fcmIDs : []; const delivery: AndroidDelivery = { source, deviceType: 'android', androidIDs, deviceTokens, codeVersion, stateVersion, }; if (response.errors) { delivery.errors = response.errors; } const result: AndroidResult = { info: notificationInfo, delivery, }; if (response.invalidTokens) { result.invalidTokens = response.invalidTokens; } return result; } type WebDelivery = { +source: $PropertyType, +deviceType: 'web', +deviceTokens: $ReadOnlyArray, +codeVersion?: number, +stateVersion: number, +errors?: $ReadOnlyArray, }; type WebResult = { +info: NotificationInfo, +delivery: WebDelivery, +invalidTokens?: $ReadOnlyArray, }; async function sendWebNotifications( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion, stateVersion } = notificationInfo; const response = await webPush(targetedNotifications); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const delivery: WebDelivery = { source, deviceType: 'web', deviceTokens, codeVersion, errors: response.errors, stateVersion, }; const result: WebResult = { info: notificationInfo, delivery, invalidTokens: response.invalidTokens, }; return result; } type WNSDelivery = { +source: $PropertyType, +deviceType: 'windows', +wnsIDs: $ReadOnlyArray, +deviceTokens: $ReadOnlyArray, +codeVersion?: number, +stateVersion: number, +errors?: $ReadOnlyArray, }; type WNSResult = { +info: NotificationInfo, +delivery: WNSDelivery, +invalidTokens?: $ReadOnlyArray, }; async function sendWNSNotification( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion, stateVersion } = notificationInfo; const response = await wnsPush(targetedNotifications); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const wnsIDs = response.wnsIDs ?? []; const delivery: WNSDelivery = { source, deviceType: 'windows', wnsIDs, deviceTokens, codeVersion, errors: response.errors, stateVersion, }; const result: WNSResult = { info: notificationInfo, delivery, invalidTokens: response.invalidTokens, }; return result; } type InvalidToken = { +userID: string, +tokens: $ReadOnlyArray, }; async function removeInvalidTokens( invalidTokens: $ReadOnlyArray, ): Promise { const sqlTuples = invalidTokens.map( invalidTokenUser => SQL`( user = ${invalidTokenUser.userID} AND device_token IN (${invalidTokenUser.tokens}) )`, ); const sqlCondition = mergeOrConditions(sqlTuples); const selectQuery = SQL` SELECT id, user, device_token FROM cookies WHERE `; selectQuery.append(sqlCondition); const [result] = await dbQuery(selectQuery); const userCookiePairsToInvalidDeviceTokens = new Map(); for (const row of result) { const userCookiePair = `${row.user}|${row.id}`; const existing = userCookiePairsToInvalidDeviceTokens.get(userCookiePair); if (existing) { existing.add(row.device_token); } else { userCookiePairsToInvalidDeviceTokens.set( userCookiePair, new Set([row.device_token]), ); } } const time = Date.now(); const promises: Array> = []; for (const entry of userCookiePairsToInvalidDeviceTokens) { const [userCookiePair, deviceTokens] = entry; const [userID, cookieID] = userCookiePair.split('|'); const updateDatas = [...deviceTokens].map(deviceToken => ({ type: updateTypes.BAD_DEVICE_TOKEN, userID, time, deviceToken, targetCookie: cookieID, })); promises.push(createUpdates(updateDatas)); } const updateQuery = SQL` UPDATE cookies SET device_token = NULL WHERE `; updateQuery.append(sqlCondition); promises.push(dbQuery(updateQuery)); await Promise.all(promises); } async function updateBadgeCount( viewer: Viewer, source: 'mark_as_unread' | 'mark_as_read' | 'activity_update', ) { const { userID } = viewer; const deviceTokenQuery = SQL` SELECT platform, device_token, versions, id FROM cookies WHERE user = ${userID} AND device_token IS NOT NULL `; if (viewer.data.cookieID) { deviceTokenQuery.append(SQL`AND id != ${viewer.cookieID} `); } const [unreadCounts, [deviceTokenResult], [dbID]] = await Promise.all([ getUnreadCounts([userID]), dbQuery(deviceTokenQuery), createIDs('notifications', 1), ]); const unreadCount = unreadCounts[userID]; const devices = deviceTokenResult.map(row => { const versions = JSON.parse(row.versions); return { platform: row.platform, cookieID: row.id, deviceToken: row.device_token, codeVersion: versions?.codeVersion, stateVersion: versions?.stateVersion, }; }); const byPlatform = getDevicesByPlatform(devices); const preparePromises: Array>> = []; const iosVersionsToTokens = byPlatform.get('ios'); if (iosVersionsToTokens) { for (const [versionKey, deviceInfos] of iosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'ios', codeVersion, stateVersion, }); notification.badge = unreadCount; notification.pushType = 'alert'; const preparePromise: Promise = (async () => { let targetedNotifications: $ReadOnlyArray; if (codeVersion > 222) { const notificationsArray = await prepareEncryptedIOSNotifications( deviceInfos, notification, codeVersion, ); targetedNotifications = notificationsArray.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); } else { targetedNotifications = deviceInfos.map(({ deviceToken }) => ({ notification, deviceToken, })); } return targetedNotifications.map(targetedNotification => ({ notification: targetedNotification, platform: 'ios', notificationInfo: { source, dbID, userID, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const androidVersionsToTokens = byPlatform.get('android'); if (androidVersionsToTokens) { for (const [versionKey, deviceInfos] of androidVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const notificationData = codeVersion < 69 ? { badge: unreadCount.toString() } : { badge: unreadCount.toString(), badgeOnly: '1' }; const notification = { data: notificationData }; const preparePromise: Promise = (async () => { let targetedNotifications: $ReadOnlyArray; if (codeVersion > 222) { const notificationsArray = await prepareEncryptedAndroidNotifications( deviceInfos, notification, ); targetedNotifications = notificationsArray.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); } else { targetedNotifications = deviceInfos.map(({ deviceToken }) => ({ deviceToken, notification, })); } return targetedNotifications.map(targetedNotification => ({ notification: targetedNotification, platform: 'android', notificationInfo: { source, dbID, userID, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const macosVersionsToTokens = byPlatform.get('macos'); if (macosVersionsToTokens) { for (const [versionKey, deviceInfos] of macosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'macos', codeVersion, stateVersion, }); notification.badge = unreadCount; notification.pushType = 'alert'; const preparePromise: Promise = (async () => { return deviceInfos.map(({ deviceToken }) => ({ notification: ({ deviceToken, notification, }: TargetedAPNsNotification), platform: 'macos', notificationInfo: { source, dbID, userID, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const prepareResults = await Promise.all(preparePromises); const flattenedPrepareResults = prepareResults.filter(Boolean).flat(); const deliveryResults = await deliverPushNotifsInEncryptionOrder( flattenedPrepareResults, ); await saveNotifResults(deliveryResults, new Map(), false); } export { sendPushNotifs, sendRescindNotifs, updateBadgeCount }; diff --git a/keyserver/src/responders/redux-state-responders.js b/keyserver/src/responders/redux-state-responders.js index 072e90685..903e04ec0 100644 --- a/keyserver/src/responders/redux-state-responders.js +++ b/keyserver/src/responders/redux-state-responders.js @@ -1,363 +1,364 @@ // @flow import _keyBy from 'lodash/fp/keyBy.js'; import t, { type TInterface } from 'tcomb'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { daysToEntriesFromEntryInfos } from 'lib/reducers/entry-reducer.js'; import { freshMessageStore } from 'lib/reducers/message-reducer.js'; import { mostRecentlyReadThread } from 'lib/selectors/thread-selectors.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { threadHasPermission, threadIsPending, parsePendingThreadID, createPendingThread, } from 'lib/shared/thread-utils.js'; import { canUseDatabaseOnWeb } from 'lib/shared/web-database.js'; import { entryStoreValidator } from 'lib/types/entry-types.js'; import { defaultCalendarFilters } from 'lib/types/filter-types.js'; import { inviteLinksStoreValidator, type CommunityLinks, } from 'lib/types/link-types.js'; import { defaultNumberPerThread, messageStoreValidator, + type MessageStore, } from 'lib/types/message-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { threadTypes } from 'lib/types/thread-types-enum.js'; import { threadStoreValidator } from 'lib/types/thread-types.js'; import { currentUserInfoValidator, userInfosValidator, type GlobalAccountUserInfo, } from 'lib/types/user-types.js'; import { currentDateInTimeZone } from 'lib/utils/date-utils.js'; import { ServerError } from 'lib/utils/errors.js'; import { promiseAll } from 'lib/utils/promises.js'; import { urlInfoValidator } from 'lib/utils/url-utils.js'; import { tShape, ashoatKeyserverID } from 'lib/utils/validation-utils.js'; import { navInfoValidator } from 'web/types/nav-types.js'; import type { InitialReduxStateResponse, InitialKeyserverInfo, InitialReduxStateRequest, ExcludedData, } from 'web/types/redux-types.js'; import { navInfoFromURL } from 'web/url-utils.js'; import { fetchEntryInfos } from '../fetchers/entry-fetchers.js'; import { fetchPrimaryInviteLinks } from '../fetchers/link-fetchers.js'; import { fetchMessageInfos } from '../fetchers/message-fetchers.js'; import { hasAnyNotAcknowledgedPolicies } from '../fetchers/policy-acknowledgment-fetchers.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchCurrentUserInfo, fetchKnownUserInfos, fetchUserInfos, } from '../fetchers/user-fetchers.js'; import { getWebPushConfig } from '../push/providers.js'; import { setNewSession } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; const excludedDataValidator: TInterface = tShape({ threadStore: t.maybe(t.Bool), }); export const initialReduxStateRequestValidator: TInterface = tShape({ urlInfo: urlInfoValidator, excludedData: excludedDataValidator, }); const initialKeyserverInfoValidator = tShape({ sessionID: t.maybe(t.String), updatesCurrentAsOf: t.Number, }); export const initialReduxStateValidator: TInterface = tShape({ navInfo: navInfoValidator, currentUserInfo: currentUserInfoValidator, entryStore: entryStoreValidator, threadStore: threadStoreValidator, userInfos: userInfosValidator, messageStore: messageStoreValidator, pushApiPublicKey: t.maybe(t.String), commServicesAccessToken: t.Nil, inviteLinksStore: inviteLinksStoreValidator, keyserverInfo: initialKeyserverInfoValidator, }); async function getInitialReduxStateResponder( viewer: Viewer, request: InitialReduxStateRequest, ): Promise { const { urlInfo, excludedData } = request; const useDatabase = viewer.loggedIn && canUseDatabaseOnWeb(viewer.userID); const hasNotAcknowledgedPoliciesPromise = hasAnyNotAcknowledgedPolicies( viewer.id, baseLegalPolicies, ); const initialNavInfoPromise = (async () => { try { let backupInfo = { now: currentDateInTimeZone(viewer.timeZone), }; // Some user ids in selectedUserList might not exist in the userInfos // (e.g. they were included in the results of the user search endpoint) // Because of that we keep their userInfos inside the navInfo. if (urlInfo.selectedUserList) { const fetchedUserInfos = await fetchUserInfos(urlInfo.selectedUserList); const userInfos: { [string]: GlobalAccountUserInfo } = {}; for (const userID in fetchedUserInfos) { const userInfo = fetchedUserInfos[userID]; if (userInfo.username) { userInfos[userID] = { ...userInfo, username: userInfo.username, }; } } backupInfo = { userInfos, ...backupInfo }; } return navInfoFromURL(urlInfo, backupInfo); } catch (e) { throw new ServerError(e.message); } })(); const calendarQueryPromise = (async () => { const initialNavInfo = await initialNavInfoPromise; return { startDate: initialNavInfo.startDate, endDate: initialNavInfo.endDate, filters: defaultCalendarFilters, }; })(); const messageSelectionCriteria = { joinedThreads: true }; const initialTime = Date.now(); const threadInfoPromise = fetchThreadInfos(viewer); const messageInfoPromise = fetchMessageInfos( viewer, messageSelectionCriteria, defaultNumberPerThread, ); const entryInfoPromise = (async () => { const calendarQuery = await calendarQueryPromise; return await fetchEntryInfos(viewer, [calendarQuery]); })(); const currentUserInfoPromise = fetchCurrentUserInfo(viewer); const userInfoPromise = fetchKnownUserInfos(viewer); const sessionIDPromise = (async () => { const calendarQuery = await calendarQueryPromise; if (viewer.loggedIn) { await setNewSession(viewer, calendarQuery, initialTime); } return viewer.sessionID; })(); const threadStorePromise = (async () => { if (excludedData.threadStore && useDatabase) { return { threadInfos: {} }; } const [{ threadInfos }, hasNotAcknowledgedPolicies] = await Promise.all([ threadInfoPromise, hasNotAcknowledgedPoliciesPromise, ]); return { threadInfos: hasNotAcknowledgedPolicies ? {} : threadInfos }; })(); - const messageStorePromise = (async () => { + const messageStorePromise: Promise = (async () => { const [ { threadInfos }, { rawMessageInfos, truncationStatuses }, hasNotAcknowledgedPolicies, ] = await Promise.all([ threadInfoPromise, messageInfoPromise, hasNotAcknowledgedPoliciesPromise, ]); if (hasNotAcknowledgedPolicies) { return { messages: {}, threads: {}, local: {}, currentAsOf: { [ashoatKeyserverID]: 0 }, }; } const { messageStore: freshStore } = freshMessageStore( rawMessageInfos, truncationStatuses, { [ashoatKeyserverID]: mostRecentMessageTimestamp( rawMessageInfos, initialTime, ), }, threadInfos, ); return freshStore; })(); const entryStorePromise = (async () => { const [{ rawEntryInfos }, hasNotAcknowledgedPolicies] = await Promise.all([ entryInfoPromise, hasNotAcknowledgedPoliciesPromise, ]); if (hasNotAcknowledgedPolicies) { return { entryInfos: {}, daysToEntries: {}, lastUserInteractionCalendar: 0, }; } return { entryInfos: _keyBy('id')(rawEntryInfos), daysToEntries: daysToEntriesFromEntryInfos(rawEntryInfos), lastUserInteractionCalendar: initialTime, }; })(); const userInfosPromise = (async () => { const [userInfos, hasNotAcknowledgedPolicies] = await Promise.all([ userInfoPromise, hasNotAcknowledgedPoliciesPromise, ]); return hasNotAcknowledgedPolicies ? {} : userInfos; })(); const navInfoPromise = (async () => { const [ { threadInfos }, messageStore, currentUserInfo, userInfos, finalNavInfo, ] = await Promise.all([ threadInfoPromise, messageStorePromise, currentUserInfoPromise, userInfosPromise, initialNavInfoPromise, ]); const requestedActiveChatThreadID = finalNavInfo.activeChatThreadID; if ( requestedActiveChatThreadID && !threadIsPending(requestedActiveChatThreadID) && !threadHasPermission( threadInfos[requestedActiveChatThreadID], threadPermissions.VISIBLE, ) ) { finalNavInfo.activeChatThreadID = null; } if (!finalNavInfo.activeChatThreadID) { const mostRecentThread = mostRecentlyReadThread( messageStore, threadInfos, ); if (mostRecentThread) { finalNavInfo.activeChatThreadID = mostRecentThread; } } if ( finalNavInfo.activeChatThreadID && threadIsPending(finalNavInfo.activeChatThreadID) && finalNavInfo.pendingThread?.id !== finalNavInfo.activeChatThreadID ) { const pendingThreadData = parsePendingThreadID( finalNavInfo.activeChatThreadID, ); if ( pendingThreadData && pendingThreadData.threadType !== threadTypes.SIDEBAR && currentUserInfo.id ) { const members = [...pendingThreadData.memberIDs, currentUserInfo.id] .map(id => { const userInfo = userInfos[id]; if (!userInfo || !userInfo.username) { return undefined; } const { username } = userInfo; return { id, username }; }) .filter(Boolean); const newPendingThread = createPendingThread({ viewerID: currentUserInfo.id, threadType: pendingThreadData.threadType, members, }); finalNavInfo.activeChatThreadID = newPendingThread.id; finalNavInfo.pendingThread = newPendingThread; } } return finalNavInfo; })(); const currentAsOfPromise = (async () => { const hasNotAcknowledgedPolicies = await hasNotAcknowledgedPoliciesPromise; return hasNotAcknowledgedPolicies ? 0 : initialTime; })(); - const pushApiPublicKeyPromise = (async () => { + const pushApiPublicKeyPromise: Promise = (async () => { const pushConfig = await getWebPushConfig(); if (!pushConfig) { if (process.env.NODE_ENV !== 'development') { console.warn('keyserver/secrets/web_push_config.json should exist'); } return null; } return pushConfig.publicKey; })(); const inviteLinksStorePromise = (async () => { const primaryInviteLinks = await fetchPrimaryInviteLinks(viewer); const links: { [string]: CommunityLinks } = {}; for (const link of primaryInviteLinks) { if (link.primary) { links[link.communityID] = { primaryLink: link, }; } } return { links, }; })(); const keyserverInfoPromise = (async () => { const { sessionID, updatesCurrentAsOf } = await promiseAll({ sessionID: sessionIDPromise, updatesCurrentAsOf: currentAsOfPromise, }); return { sessionID, updatesCurrentAsOf, }; })(); const initialReduxState: InitialReduxStateResponse = await promiseAll({ navInfo: navInfoPromise, currentUserInfo: currentUserInfoPromise, entryStore: entryStorePromise, threadStore: threadStorePromise, userInfos: userInfosPromise, messageStore: messageStorePromise, pushApiPublicKey: pushApiPublicKeyPromise, commServicesAccessToken: null, inviteLinksStore: inviteLinksStorePromise, keyserverInfo: keyserverInfoPromise, }); return initialReduxState; } export { getInitialReduxStateResponder }; diff --git a/keyserver/src/responders/user-responders.js b/keyserver/src/responders/user-responders.js index d48afefbb..84ab7eaa6 100644 --- a/keyserver/src/responders/user-responders.js +++ b/keyserver/src/responders/user-responders.js @@ -1,763 +1,772 @@ // @flow import type { Utility as OlmUtility } from '@commapp/olm'; import invariant from 'invariant'; import { ErrorTypes, SiweMessage } from 'siwe'; import t, { type TInterface, type TUnion } from 'tcomb'; import bcrypt from 'twin-bcrypt'; import { baseLegalPolicies, policies, policyTypeValidator, } from 'lib/facts/policies.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { ResetPasswordRequest, LogOutResponse, RegisterResponse, RegisterRequest, LogInResponse, LogInRequest, UpdatePasswordRequest, UpdateUserSettingsRequest, PolicyAcknowledgmentRequest, ClaimUsernameResponse, } from 'lib/types/account-types.js'; import { userSettingsTypes, notificationTypeValues, logInActionSources, } from 'lib/types/account-types.js'; import { type ClientAvatar, clientAvatarValidator, type UpdateUserAvatarResponse, type UpdateUserAvatarRequest, } from 'lib/types/avatar-types.js'; import type { ReservedUsernameMessage, IdentityKeysBlob, SignedIdentityKeysBlob, } from 'lib/types/crypto-types.js'; import { type CalendarQuery, rawEntryInfoValidator, + type FetchEntryInfosBase, } from 'lib/types/entry-types.js'; import { defaultNumberPerThread, rawMessageInfoValidator, messageTruncationStatusesValidator, } from 'lib/types/message-types.js'; import type { SIWEAuthRequest, SIWEMessage, SIWESocialProof, } from 'lib/types/siwe-types.js'; import { type SubscriptionUpdateRequest, type SubscriptionUpdateResponse, threadSubscriptionValidator, } from 'lib/types/subscription-types.js'; import { rawThreadInfoValidator } from 'lib/types/thread-types.js'; import { createUpdatesResultValidator } from 'lib/types/update-types.js'; import { type PasswordUpdate, loggedOutUserInfoValidator, loggedInUserInfoValidator, userInfoValidator, } from 'lib/types/user-types.js'; import { identityKeysBlobValidator, signedIdentityKeysBlobValidator, } from 'lib/utils/crypto-utils.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; -import { promiseAll } from 'lib/utils/promises.js'; import { getPublicKeyFromSIWEStatement, isValidSIWEMessage, isValidSIWEStatementWithPublicKey, primaryIdentityPublicKeyRegex, } from 'lib/utils/siwe-utils.js'; import { tShape, tPlatformDetails, tPassword, tEmail, tOldValidUsername, tRegex, tID, } from 'lib/utils/validation-utils.js'; import { entryQueryInputValidator, newEntryQueryInputValidator, normalizeCalendarQuery, verifyCalendarQueryThreadIDs, } from './entry-responders.js'; import { createAccount, processSIWEAccountCreation, } from '../creators/account-creator.js'; import { createOlmSession } from '../creators/olm-session-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { deleteAccount } from '../deleters/account-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { checkAndInvalidateSIWENonceEntry } from '../deleters/siwe-nonce-deleters.js'; import { fetchEntryInfos } from '../fetchers/entry-fetchers.js'; import { fetchMessageInfos } from '../fetchers/message-fetchers.js'; import { fetchNotAcknowledgedPolicies } from '../fetchers/policy-acknowledgment-fetchers.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchKnownUserInfos, fetchLoggedInUserInfo, fetchUserIDForEthereumAddress, fetchUsername, } from '../fetchers/user-fetchers.js'; import { createNewAnonymousCookie, createNewUserCookie, setNewSession, } from '../session/cookies.js'; import type { Viewer } from '../session/viewer.js'; import { accountUpdater, checkAndSendVerificationEmail, checkAndSendPasswordResetEmail, updatePassword, updateUserSettings, updateUserAvatar, } from '../updaters/account-updaters.js'; import { fetchOlmAccount } from '../updaters/olm-account-updater.js'; import { userSubscriptionUpdater } from '../updaters/user-subscription-updaters.js'; import { viewerAcknowledgmentUpdater } from '../updaters/viewer-acknowledgment-updater.js'; import { getOlmUtility } from '../utils/olm-utils.js'; export const subscriptionUpdateRequestInputValidator: TInterface = tShape({ threadID: tID, updatedFields: tShape({ pushNotifs: t.maybe(t.Boolean), home: t.maybe(t.Boolean), }), }); export const subscriptionUpdateResponseValidator: TInterface = tShape({ threadSubscription: threadSubscriptionValidator, }); async function userSubscriptionUpdateResponder( viewer: Viewer, request: SubscriptionUpdateRequest, ): Promise { const threadSubscription = await userSubscriptionUpdater(viewer, request); return { threadSubscription, }; } export const accountUpdateInputValidator: TInterface = tShape({ updatedFields: tShape({ email: t.maybe(tEmail), password: t.maybe(tPassword), }), currentPassword: tPassword, }); async function passwordUpdateResponder( viewer: Viewer, request: PasswordUpdate, ): Promise { await accountUpdater(viewer, request); } async function sendVerificationEmailResponder(viewer: Viewer): Promise { await checkAndSendVerificationEmail(viewer); } export const resetPasswordRequestInputValidator: TInterface = tShape({ usernameOrEmail: t.union([tEmail, tOldValidUsername]), }); async function sendPasswordResetEmailResponder( viewer: Viewer, request: ResetPasswordRequest, ): Promise { await checkAndSendPasswordResetEmail(request); } export const logOutResponseValidator: TInterface = tShape({ currentUserInfo: loggedOutUserInfoValidator, }); async function logOutResponder(viewer: Viewer): Promise { if (viewer.loggedIn) { const [anonymousViewerData] = await Promise.all([ createNewAnonymousCookie({ platformDetails: viewer.platformDetails, deviceToken: viewer.deviceToken, }), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(anonymousViewerData); } return { currentUserInfo: { anonymous: true, }, }; } async function accountDeletionResponder( viewer: Viewer, ): Promise { const result = await deleteAccount(viewer); invariant(result, 'deleteAccount should return result if handed request'); return result; } const deviceTokenUpdateRequestInputValidator = tShape({ deviceType: t.maybe(t.enums.of(['ios', 'android'])), deviceToken: t.String, }); export const registerRequestInputValidator: TInterface = tShape({ username: t.String, email: t.maybe(tEmail), password: tPassword, calendarQuery: t.maybe(newEntryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, // We include `primaryIdentityPublicKey` to avoid breaking // old clients, but we no longer do anything with it. primaryIdentityPublicKey: t.maybe(tRegex(primaryIdentityPublicKeyRegex)), signedIdentityKeysBlob: t.maybe(signedIdentityKeysBlobValidator), initialNotificationsEncryptedMessage: t.maybe(t.String), }); export const registerResponseValidator: TInterface = tShape({ id: t.String, rawMessageInfos: t.list(rawMessageInfoValidator), currentUserInfo: loggedInUserInfoValidator, cookieChange: tShape({ threadInfos: t.dict(tID, rawThreadInfoValidator), userInfos: t.list(userInfoValidator), }), }); async function accountCreationResponder( viewer: Viewer, request: RegisterRequest, ): Promise { const { signedIdentityKeysBlob } = request; if (signedIdentityKeysBlob) { const identityKeys: IdentityKeysBlob = JSON.parse( signedIdentityKeysBlob.payload, ); if (!identityKeysBlobValidator.is(identityKeys)) { throw new ServerError('invalid_identity_keys_blob'); } const olmUtil: OlmUtility = getOlmUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); } catch (e) { throw new ServerError('invalid_signature'); } } return await createAccount(viewer, request); } type ProcessSuccessfulLoginParams = { +viewer: Viewer, +input: any, +userID: string, +calendarQuery: ?CalendarQuery, +socialProof?: ?SIWESocialProof, +signedIdentityKeysBlob?: ?SignedIdentityKeysBlob, +initialNotificationsEncryptedMessage?: string, }; async function processSuccessfulLogin( params: ProcessSuccessfulLoginParams, ): Promise { const { viewer, input, userID, calendarQuery, socialProof, signedIdentityKeysBlob, initialNotificationsEncryptedMessage, } = params; const request: LogInRequest = input; const newServerTime = Date.now(); const deviceToken = request.deviceTokenUpdateRequest ? request.deviceTokenUpdateRequest.deviceToken : viewer.deviceToken; const [userViewerData, notAcknowledgedPolicies] = await Promise.all([ createNewUserCookie(userID, { platformDetails: request.platformDetails, deviceToken, socialProof, signedIdentityKeysBlob, }), fetchNotAcknowledgedPolicies(userID, baseLegalPolicies), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(userViewerData); if ( notAcknowledgedPolicies.length && hasMinCodeVersion(viewer.platformDetails, { native: 181 }) ) { const currentUserInfo = await fetchLoggedInUserInfo(viewer); return { notAcknowledgedPolicies, currentUserInfo: currentUserInfo, rawMessageInfos: [], truncationStatuses: {}, userInfos: [], rawEntryInfos: [], serverTime: 0, cookieChange: { threadInfos: {}, userInfos: [], }, }; } if (calendarQuery) { await setNewSession(viewer, calendarQuery, newServerTime); } const olmSessionPromise = (async () => { if ( userViewerData.cookieID && initialNotificationsEncryptedMessage && signedIdentityKeysBlob ) { await createOlmSession( initialNotificationsEncryptedMessage, 'notifications', userViewerData.cookieID, ); } })(); const threadCursors: { [string]: null } = {}; for (const watchedThreadID of request.watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true }; + const entriesPromise: Promise = (async () => { + if (!calendarQuery) { + return undefined; + } + return await fetchEntryInfos(viewer, [calendarQuery]); + })(); + const [ threadsResult, messagesResult, entriesResult, userInfos, currentUserInfo, ] = await Promise.all([ fetchThreadInfos(viewer), fetchMessageInfos(viewer, messageSelectionCriteria, defaultNumberPerThread), - calendarQuery ? fetchEntryInfos(viewer, [calendarQuery]) : undefined, + entriesPromise, fetchKnownUserInfos(viewer), fetchLoggedInUserInfo(viewer), olmSessionPromise, ]); const rawEntryInfos = entriesResult ? entriesResult.rawEntryInfos : null; const response: LogInResponse = { currentUserInfo, rawMessageInfos: messagesResult.rawMessageInfos, truncationStatuses: messagesResult.truncationStatuses, serverTime: newServerTime, userInfos: values(userInfos), cookieChange: { threadInfos: threadsResult.threadInfos, userInfos: [], }, }; if (rawEntryInfos) { return { ...response, rawEntryInfos, }; } return response; } export const logInRequestInputValidator: TInterface = tShape({ username: t.maybe(t.String), usernameOrEmail: t.maybe(t.union([tEmail, tOldValidUsername])), password: tPassword, watchedIDs: t.list(tID), calendarQuery: t.maybe(entryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, source: t.maybe(t.enums.of(values(logInActionSources))), // We include `primaryIdentityPublicKey` to avoid breaking // old clients, but we no longer do anything with it. primaryIdentityPublicKey: t.maybe(tRegex(primaryIdentityPublicKeyRegex)), signedIdentityKeysBlob: t.maybe(signedIdentityKeysBlobValidator), initialNotificationsEncryptedMessage: t.maybe(t.String), }); export const logInResponseValidator: TInterface = tShape({ currentUserInfo: loggedInUserInfoValidator, rawMessageInfos: t.list(rawMessageInfoValidator), truncationStatuses: messageTruncationStatusesValidator, userInfos: t.list(userInfoValidator), rawEntryInfos: t.maybe(t.list(rawEntryInfoValidator)), serverTime: t.Number, cookieChange: tShape({ threadInfos: t.dict(tID, rawThreadInfoValidator), userInfos: t.list(userInfoValidator), }), notAcknowledgedPolicies: t.maybe(t.list(policyTypeValidator)), }); async function logInResponder( viewer: Viewer, request: LogInRequest, ): Promise { let identityKeys: ?IdentityKeysBlob; const { signedIdentityKeysBlob, initialNotificationsEncryptedMessage } = request; if (signedIdentityKeysBlob) { identityKeys = JSON.parse(signedIdentityKeysBlob.payload); const olmUtil: OlmUtility = getOlmUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); } catch (e) { throw new ServerError('invalid_signature'); } } const calendarQuery = request.calendarQuery ? normalizeCalendarQuery(request.calendarQuery) : null; - const promises = {}; - if (calendarQuery) { - promises.verifyCalendarQueryThreadIDs = - verifyCalendarQueryThreadIDs(calendarQuery); - } + const verifyCalendarQueryThreadIDsPromise = (async () => { + if (calendarQuery) { + await verifyCalendarQueryThreadIDs(calendarQuery); + } + })(); + const username = request.username ?? request.usernameOrEmail; if (!username) { if (hasMinCodeVersion(viewer.platformDetails, { native: 150 })) { throw new ServerError('invalid_credentials'); } else { throw new ServerError('invalid_parameters'); } } const userQuery = SQL` SELECT id, hash, username FROM users WHERE LCASE(username) = LCASE(${username}) `; - promises.userQuery = dbQuery(userQuery); - const { - userQuery: [userResult], - } = await promiseAll(promises); + const userQueryPromise = dbQuery(userQuery); + const [[userResult]] = await Promise.all([ + userQueryPromise, + verifyCalendarQueryThreadIDsPromise, + ]); if (userResult.length === 0) { if (hasMinCodeVersion(viewer.platformDetails, { native: 150 })) { throw new ServerError('invalid_credentials'); } else { throw new ServerError('invalid_parameters'); } } const userRow = userResult[0]; if (!userRow.hash || !bcrypt.compareSync(request.password, userRow.hash)) { throw new ServerError('invalid_credentials'); } const id = userRow.id.toString(); return await processSuccessfulLogin({ viewer, input: request, userID: id, calendarQuery, signedIdentityKeysBlob, initialNotificationsEncryptedMessage, }); } export const siweAuthRequestInputValidator: TInterface = tShape({ signature: t.String, message: t.String, calendarQuery: entryQueryInputValidator, deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, watchedIDs: t.list(tID), signedIdentityKeysBlob: t.maybe(signedIdentityKeysBlobValidator), initialNotificationsEncryptedMessage: t.maybe(t.String), doNotRegister: t.maybe(t.Boolean), }); async function siweAuthResponder( viewer: Viewer, request: SIWEAuthRequest, ): Promise { const { message, signature, deviceTokenUpdateRequest, platformDetails, signedIdentityKeysBlob, initialNotificationsEncryptedMessage, doNotRegister, } = request; const calendarQuery = normalizeCalendarQuery(request.calendarQuery); // 1. Ensure that `message` is a well formed Comm SIWE Auth message. const siweMessage: SIWEMessage = new SiweMessage(message); if (!isValidSIWEMessage(siweMessage)) { throw new ServerError('invalid_parameters'); } // 2. Ensure that the `nonce` exists in the `siwe_nonces` table // AND hasn't expired. If those conditions are met, delete the entry to // ensure that the same `nonce` can't be re-used in a future request. const wasNonceCheckedAndInvalidated = await checkAndInvalidateSIWENonceEntry( siweMessage.nonce, ); if (!wasNonceCheckedAndInvalidated) { throw new ServerError('invalid_parameters'); } // 3. Validate SIWEMessage signature and handle possible errors. try { await siweMessage.validate(signature); } catch (error) { if (error === ErrorTypes.EXPIRED_MESSAGE) { // Thrown when the `expirationTime` is present and in the past. throw new ServerError('expired_message'); } else if (error === ErrorTypes.INVALID_SIGNATURE) { // Thrown when the `validate()` function can't verify the message. throw new ServerError('invalid_signature'); } else if (error === ErrorTypes.MALFORMED_SESSION) { // Thrown when some required field is missing. throw new ServerError('malformed_session'); } else { throw new ServerError('unknown_error'); } } // 4. Pull `primaryIdentityPublicKey` out from SIWEMessage `statement`. // We expect it to be included for BOTH native and web clients. const { statement } = siweMessage; const primaryIdentityPublicKey = statement && isValidSIWEStatementWithPublicKey(statement) ? getPublicKeyFromSIWEStatement(statement) : null; if (!primaryIdentityPublicKey) { throw new ServerError('invalid_siwe_statement_public_key'); } // 5. Verify `signedIdentityKeysBlob.payload` with included `signature` // if `signedIdentityKeysBlob` was included in the `SIWEAuthRequest`. let identityKeys: ?IdentityKeysBlob; if (signedIdentityKeysBlob) { identityKeys = JSON.parse(signedIdentityKeysBlob.payload); if (!identityKeysBlobValidator.is(identityKeys)) { throw new ServerError('invalid_identity_keys_blob'); } const olmUtil: OlmUtility = getOlmUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); } catch (e) { throw new ServerError('invalid_signature'); } } // 6. Ensure that `primaryIdentityPublicKeys.ed25519` matches SIWE // statement `primaryIdentityPublicKey` if `identityKeys` exists. if ( identityKeys && identityKeys.primaryIdentityPublicKeys.ed25519 !== primaryIdentityPublicKey ) { throw new ServerError('primary_public_key_mismatch'); } // 7. Construct `SIWESocialProof` object with the stringified // SIWEMessage and the corresponding signature. const socialProof: SIWESocialProof = { siweMessage: siweMessage.toMessage(), siweMessageSignature: signature, }; // 8. Create account with call to `processSIWEAccountCreation(...)` // if address does not correspond to an existing user. let userID = await fetchUserIDForEthereumAddress(siweMessage.address); if (!userID && doNotRegister) { throw new ServerError('account_does_not_exist'); } else if (!userID) { const siweAccountCreationRequest = { address: siweMessage.address, calendarQuery, deviceTokenUpdateRequest, platformDetails, socialProof, }; userID = await processSIWEAccountCreation( viewer, siweAccountCreationRequest, ); } // 9. Complete login with call to `processSuccessfulLogin(...)`. return await processSuccessfulLogin({ viewer, input: request, userID, calendarQuery, socialProof, signedIdentityKeysBlob, initialNotificationsEncryptedMessage, }); } export const updatePasswordRequestInputValidator: TInterface = tShape({ code: t.String, password: tPassword, watchedIDs: t.list(tID), calendarQuery: t.maybe(entryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, }); async function oldPasswordUpdateResponder( viewer: Viewer, request: UpdatePasswordRequest, ): Promise { if (request.calendarQuery) { request.calendarQuery = normalizeCalendarQuery(request.calendarQuery); } return await updatePassword(viewer, request); } export const updateUserSettingsInputValidator: TInterface = tShape({ name: t.irreducible( userSettingsTypes.DEFAULT_NOTIFICATIONS, x => x === userSettingsTypes.DEFAULT_NOTIFICATIONS, ), data: t.enums.of(notificationTypeValues), }); async function updateUserSettingsResponder( viewer: Viewer, request: UpdateUserSettingsRequest, ): Promise { await updateUserSettings(viewer, request); } export const policyAcknowledgmentRequestInputValidator: TInterface = tShape({ policy: t.maybe(t.enums.of(policies)), }); async function policyAcknowledgmentResponder( viewer: Viewer, request: PolicyAcknowledgmentRequest, ): Promise { await viewerAcknowledgmentUpdater(viewer, request.policy); } export const updateUserAvatarResponseValidator: TInterface = tShape({ updates: createUpdatesResultValidator, }); export const updateUserAvatarResponderValidator: TUnion< ?ClientAvatar | UpdateUserAvatarResponse, > = t.union([ t.maybe(clientAvatarValidator), updateUserAvatarResponseValidator, ]); async function updateUserAvatarResponder( viewer: Viewer, request: UpdateUserAvatarRequest, ): Promise { return await updateUserAvatar(viewer, request); } export const claimUsernameResponseValidator: TInterface = tShape({ message: t.String, signature: t.String, }); async function claimUsernameResponder( viewer: Viewer, ): Promise { const [username, accountInfo] = await Promise.all([ fetchUsername(viewer.userID), fetchOlmAccount('content'), ]); if (!username) { throw new ServerError('invalid_credentials'); } const issuedAt = new Date().toISOString(); const reservedUsernameMessage: ReservedUsernameMessage = { statement: 'This user is the owner of the following username and user ID', payload: { username, userID: viewer.userID, }, issuedAt, }; const message = JSON.stringify(reservedUsernameMessage); const signature = accountInfo.account.sign(message); return { message, signature }; } export { userSubscriptionUpdateResponder, passwordUpdateResponder, sendVerificationEmailResponder, sendPasswordResetEmailResponder, logOutResponder, accountDeletionResponder, accountCreationResponder, logInResponder, siweAuthResponder, oldPasswordUpdateResponder, updateUserSettingsResponder, policyAcknowledgmentResponder, updateUserAvatarResponder, claimUsernameResponder, }; diff --git a/keyserver/src/session/cookies.js b/keyserver/src/session/cookies.js index 012bd4844..24c086d03 100644 --- a/keyserver/src/session/cookies.js +++ b/keyserver/src/session/cookies.js @@ -1,789 +1,795 @@ // @flow import crypto from 'crypto'; import type { $Response, $Request } from 'express'; import invariant from 'invariant'; import url from 'url'; import { isStaff } from 'lib/shared/staff-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { Shape } from 'lib/types/core.js'; import type { SignedIdentityKeysBlob } from 'lib/types/crypto-types.js'; import type { Platform, PlatformDetails } from 'lib/types/device-types.js'; import type { CalendarQuery } from 'lib/types/entry-types.js'; import { type ServerSessionChange, cookieLifetime, cookieTypes, sessionIdentifierTypes, type SessionIdentifierType, } from 'lib/types/session-types.js'; import type { SIWESocialProof } from 'lib/types/siwe-types.js'; import type { InitialClientSocketMessage } from 'lib/types/socket-types.js'; import type { UserInfo } from 'lib/types/user-types.js'; import { isDev } from 'lib/utils/dev-utils.js'; import { values } from 'lib/utils/objects.js'; import { isBcryptHash, getCookieHash, verifyCookieHash, } from './cookie-hash.js'; import { Viewer } from './viewer.js'; import type { AnonymousViewerData, UserViewerData } from './viewer.js'; import createIDs from '../creators/id-creator.js'; import { createSession } from '../creators/session-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { clearDeviceToken } from '../updaters/device-token-updaters.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { type AppURLFacts, getAppURLFactsFromRequestURL, } from '../utils/urls.js'; function cookieIsExpired(lastUsed: number) { return lastUsed + cookieLifetime <= Date.now(); } type SessionParameterInfo = { isSocket: boolean, sessionID: ?string, sessionIdentifierType: SessionIdentifierType, ipAddress: string, userAgent: ?string, }; type FetchViewerResult = | { +type: 'valid', +viewer: Viewer } | InvalidFetchViewerResult; type InvalidFetchViewerResult = | { +type: 'nonexistant', +cookieName: ?string, +sessionParameterInfo: SessionParameterInfo, } | { +type: 'invalidated', +cookieName: string, +cookieID: string, +sessionParameterInfo: SessionParameterInfo, +platformDetails: ?PlatformDetails, +deviceToken: ?string, }; async function fetchUserViewer( cookie: string, sessionParameterInfo: SessionParameterInfo, ): Promise { const [cookieID, cookiePassword] = cookie.split(':'); if (!cookieID || !cookiePassword) { return { type: 'nonexistant', cookieName: cookieTypes.USER, sessionParameterInfo, }; } const query = SQL` SELECT hash, user, last_used, platform, device_token, versions FROM cookies WHERE id = ${cookieID} AND user IS NOT NULL `; const [[result], allSessionInfo] = await Promise.all([ dbQuery(query), fetchSessionInfo(sessionParameterInfo, cookieID), ]); if (result.length === 0) { return { type: 'nonexistant', cookieName: cookieTypes.USER, sessionParameterInfo, }; } let sessionID = null, sessionInfo = null; if (allSessionInfo) { ({ sessionID, ...sessionInfo } = allSessionInfo); } const cookieRow = result[0]; let platformDetails = null; if (cookieRow.versions) { const versions = JSON.parse(cookieRow.versions); platformDetails = { platform: cookieRow.platform, codeVersion: versions.codeVersion, stateVersion: versions.stateVersion, }; } else { platformDetails = { platform: cookieRow.platform }; } const deviceToken = cookieRow.device_token; const cookieHash = cookieRow.hash; if ( !verifyCookieHash(cookiePassword, cookieHash) || cookieIsExpired(cookieRow.last_used) ) { return { type: 'invalidated', cookieName: cookieTypes.USER, cookieID, sessionParameterInfo, platformDetails, deviceToken, }; } const userID = cookieRow.user.toString(); const viewer = new Viewer({ isSocket: sessionParameterInfo.isSocket, loggedIn: true, id: userID, platformDetails, deviceToken, userID, cookieID, cookiePassword, cookieHash, sessionIdentifierType: sessionParameterInfo.sessionIdentifierType, sessionID, sessionInfo, isScriptViewer: false, ipAddress: sessionParameterInfo.ipAddress, userAgent: sessionParameterInfo.userAgent, }); return { type: 'valid', viewer }; } async function fetchAnonymousViewer( cookie: string, sessionParameterInfo: SessionParameterInfo, ): Promise { const [cookieID, cookiePassword] = cookie.split(':'); if (!cookieID || !cookiePassword) { return { type: 'nonexistant', cookieName: cookieTypes.ANONYMOUS, sessionParameterInfo, }; } const query = SQL` SELECT last_used, hash, platform, device_token, versions FROM cookies WHERE id = ${cookieID} AND user IS NULL `; const [[result], allSessionInfo] = await Promise.all([ dbQuery(query), fetchSessionInfo(sessionParameterInfo, cookieID), ]); if (result.length === 0) { return { type: 'nonexistant', cookieName: cookieTypes.ANONYMOUS, sessionParameterInfo, }; } let sessionID = null, sessionInfo = null; if (allSessionInfo) { ({ sessionID, ...sessionInfo } = allSessionInfo); } const cookieRow = result[0]; let platformDetails = null; if (cookieRow.platform && cookieRow.versions) { const versions = JSON.parse(cookieRow.versions); platformDetails = { platform: cookieRow.platform, codeVersion: versions.codeVersion, stateVersion: versions.stateVersion, }; } else if (cookieRow.platform) { platformDetails = { platform: cookieRow.platform }; } const deviceToken = cookieRow.device_token; const cookieHash = cookieRow.hash; if ( !verifyCookieHash(cookiePassword, cookieHash) || cookieIsExpired(cookieRow.last_used) ) { return { type: 'invalidated', cookieName: cookieTypes.ANONYMOUS, cookieID, sessionParameterInfo, platformDetails, deviceToken, }; } const viewer = new Viewer({ isSocket: sessionParameterInfo.isSocket, loggedIn: false, id: cookieID, platformDetails, deviceToken, cookieID, cookiePassword, cookieHash, sessionIdentifierType: sessionParameterInfo.sessionIdentifierType, sessionID, sessionInfo, isScriptViewer: false, ipAddress: sessionParameterInfo.ipAddress, userAgent: sessionParameterInfo.userAgent, }); return { type: 'valid', viewer }; } type SessionInfo = { +sessionID: ?string, +lastValidated: number, +lastUpdate: number, +calendarQuery: CalendarQuery, }; async function fetchSessionInfo( sessionParameterInfo: SessionParameterInfo, cookieID: string, ): Promise { const { sessionID } = sessionParameterInfo; const session = sessionID !== undefined ? sessionID : cookieID; if (!session) { return null; } const query = SQL` SELECT query, last_validated, last_update FROM sessions WHERE id = ${session} AND cookie = ${cookieID} `; const [result] = await dbQuery(query); if (result.length === 0) { return null; } return { sessionID, lastValidated: result[0].last_validated, lastUpdate: result[0].last_update, calendarQuery: JSON.parse(result[0].query), }; } async function fetchViewerFromRequestBody( body: mixed, sessionParameterInfo: SessionParameterInfo, ): Promise { if (!body || typeof body !== 'object') { return { type: 'nonexistant', cookieName: null, sessionParameterInfo, }; } const cookiePair = body.cookie; if (cookiePair === null || cookiePair === '') { return { type: 'nonexistant', cookieName: null, sessionParameterInfo, }; } if (!cookiePair || typeof cookiePair !== 'string') { return { type: 'nonexistant', cookieName: null, sessionParameterInfo, }; } const [type, cookie] = cookiePair.split('='); if (type === cookieTypes.USER && cookie) { return await fetchUserViewer(cookie, sessionParameterInfo); } else if (type === cookieTypes.ANONYMOUS && cookie) { return await fetchAnonymousViewer(cookie, sessionParameterInfo); } return { type: 'nonexistant', cookieName: null, sessionParameterInfo, }; } function getRequestIPAddress(req: $Request) { const { proxy } = getAppURLFactsFromRequestURL(req.originalUrl); let ipAddress; if (proxy === 'none') { ipAddress = req.socket.remoteAddress; } else if (proxy === 'apache') { ipAddress = req.get('X-Forwarded-For'); } invariant(ipAddress, 'could not determine requesting IP address'); return ipAddress; } function getSessionParameterInfoFromRequestBody( req: $Request, ): SessionParameterInfo { const body = (req.body: any); let sessionID = body.sessionID !== undefined || req.method !== 'GET' ? body.sessionID : null; if (sessionID === '') { sessionID = null; } const sessionIdentifierType = req.method === 'GET' || sessionID !== undefined ? sessionIdentifierTypes.BODY_SESSION_ID : sessionIdentifierTypes.COOKIE_ID; return { isSocket: false, sessionID, sessionIdentifierType, ipAddress: getRequestIPAddress(req), userAgent: req.get('User-Agent'), }; } async function fetchViewerForJSONRequest(req: $Request): Promise { assertSecureRequest(req); const sessionParameterInfo = getSessionParameterInfoFromRequestBody(req); const result = await fetchViewerFromRequestBody( req.body, sessionParameterInfo, ); return await handleFetchViewerResult(result); } async function fetchViewerForSocket( req: $Request, clientMessage: InitialClientSocketMessage, ): Promise { assertSecureRequest(req); const { sessionIdentification } = clientMessage.payload; const { sessionID } = sessionIdentification; const sessionParameterInfo = { isSocket: true, sessionID, sessionIdentifierType: sessionID !== undefined ? sessionIdentifierTypes.BODY_SESSION_ID : sessionIdentifierTypes.COOKIE_ID, ipAddress: getRequestIPAddress(req), userAgent: req.get('User-Agent'), }; const result = await fetchViewerFromRequestBody( clientMessage.payload.sessionIdentification, sessionParameterInfo, ); if (result.type === 'valid') { return result.viewer; } const anonymousViewerDataPromise: Promise = (async () => { const platformDetails = result.type === 'invalidated' ? result.platformDetails : null; const deviceToken = result.type === 'invalidated' ? result.deviceToken : null; return await createNewAnonymousCookie({ platformDetails, deviceToken, }); })(); const deleteCookiePromise = (async () => { if (result.type === 'invalidated') { await deleteCookie(result.cookieID); } })(); const [anonymousViewerData] = await Promise.all([ anonymousViewerDataPromise, deleteCookiePromise, ]); return createViewerForInvalidFetchViewerResult(result, anonymousViewerData); } async function handleFetchViewerResult( result: FetchViewerResult, inputPlatformDetails?: PlatformDetails, ) { if (result.type === 'valid') { return result.viewer; } let platformDetails: ?PlatformDetails = inputPlatformDetails; if (!platformDetails && result.type === 'invalidated') { platformDetails = result.platformDetails; } const deviceToken = result.type === 'invalidated' ? result.deviceToken : null; + const deleteCookiePromise = (async () => { + if (result.type === 'invalidated') { + await deleteCookie(result.cookieID); + } + })(); + const [anonymousViewerData] = await Promise.all([ createNewAnonymousCookie({ platformDetails, deviceToken }), - result.type === 'invalidated' ? deleteCookie(result.cookieID) : null, + deleteCookiePromise, ]); return createViewerForInvalidFetchViewerResult(result, anonymousViewerData); } function createViewerForInvalidFetchViewerResult( result: InvalidFetchViewerResult, anonymousViewerData: AnonymousViewerData, ): Viewer { const viewer = new Viewer({ ...anonymousViewerData, sessionIdentifierType: result.sessionParameterInfo.sessionIdentifierType, isSocket: result.sessionParameterInfo.isSocket, ipAddress: result.sessionParameterInfo.ipAddress, userAgent: result.sessionParameterInfo.userAgent, }); viewer.sessionChanged = true; // If cookieName is falsey, that tells us that there was no cookie specified // in the request, which means we can't be invalidating anything. if (result.cookieName) { viewer.cookieInvalidated = true; viewer.initialCookieName = result.cookieName; } return viewer; } function addSessionChangeInfoToResult( viewer: Viewer, res: $Response, result: Object, ) { let threadInfos = {}, userInfos = {}; if (result.cookieChange) { ({ threadInfos, userInfos } = result.cookieChange); } let sessionChange; if (viewer.cookieInvalidated) { sessionChange = ({ cookieInvalidated: true, threadInfos, userInfos: (values(userInfos).map(a => a): UserInfo[]), currentUserInfo: { anonymous: true, }, }: ServerSessionChange); } else { sessionChange = ({ cookieInvalidated: false, threadInfos, userInfos: (values(userInfos).map(a => a): UserInfo[]), }: ServerSessionChange); } sessionChange.cookie = viewer.cookiePairString; if (viewer.sessionIdentifierType === sessionIdentifierTypes.BODY_SESSION_ID) { sessionChange.sessionID = viewer.sessionID ? viewer.sessionID : null; } result.cookieChange = sessionChange; } type AnonymousCookieCreationParams = Shape<{ +platformDetails: ?PlatformDetails, +deviceToken: ?string, }>; const defaultPlatformDetails = {}; // The result of this function should not be passed directly to the Viewer // constructor. Instead, it should be passed to viewer.setNewCookie. There are // several fields on AnonymousViewerData that are not set by this function: // sessionIdentifierType, ipAddress, and userAgent. These parameters all depend // on the initial request. If the result of this function is passed to the // Viewer constructor directly, the resultant Viewer object will throw whenever // anybody attempts to access the relevant properties. async function createNewAnonymousCookie( params: AnonymousCookieCreationParams, ): Promise { const { platformDetails, deviceToken } = params; const { platform, ...versions } = platformDetails || defaultPlatformDetails; const versionsString = Object.keys(versions).length > 0 ? JSON.stringify(versions) : null; const time = Date.now(); const cookiePassword = crypto.randomBytes(32).toString('hex'); const cookieHash = getCookieHash(cookiePassword); const [[id]] = await Promise.all([ createIDs('cookies', 1), deviceToken ? clearDeviceToken(deviceToken) : undefined, ]); const cookieRow = [ id, cookieHash, null, platform, time, time, deviceToken, versionsString, ]; const query = SQL` INSERT INTO cookies(id, hash, user, platform, creation_time, last_used, device_token, versions) VALUES ${[cookieRow]} `; await dbQuery(query); return { loggedIn: false, id, platformDetails, deviceToken, cookieID: id, cookiePassword, cookieHash, sessionID: undefined, sessionInfo: null, cookieInsertedThisRequest: true, isScriptViewer: false, }; } type UserCookieCreationParams = { +platformDetails: PlatformDetails, +deviceToken?: ?string, +socialProof?: ?SIWESocialProof, +signedIdentityKeysBlob?: ?SignedIdentityKeysBlob, }; // The result of this function should never be passed directly to the Viewer // constructor. Instead, it should be passed to viewer.setNewCookie. There are // several fields on UserViewerData that are not set by this function: // sessionID, sessionIdentifierType, and ipAddress. These parameters all depend // on the initial request. If the result of this function is passed to the // Viewer constructor directly, the resultant Viewer object will throw whenever // anybody attempts to access the relevant properties. async function createNewUserCookie( userID: string, params: UserCookieCreationParams, ): Promise { const { platformDetails, deviceToken, socialProof, signedIdentityKeysBlob } = params; const { platform, ...versions } = platformDetails || defaultPlatformDetails; const versionsString = Object.keys(versions).length > 0 ? JSON.stringify(versions) : null; const time = Date.now(); const cookiePassword = crypto.randomBytes(32).toString('hex'); const cookieHash = getCookieHash(cookiePassword); const [[cookieID]] = await Promise.all([ createIDs('cookies', 1), deviceToken ? clearDeviceToken(deviceToken) : undefined, ]); const cookieRow = [ cookieID, cookieHash, userID, platform, time, time, deviceToken, versionsString, JSON.stringify(socialProof), signedIdentityKeysBlob ? JSON.stringify(signedIdentityKeysBlob) : null, ]; const query = SQL` INSERT INTO cookies(id, hash, user, platform, creation_time, last_used, device_token, versions, social_proof, signed_identity_keys) VALUES ${[cookieRow]} `; await dbQuery(query); return { loggedIn: true, id: userID, platformDetails, deviceToken, userID, cookieID, sessionID: undefined, sessionInfo: null, cookiePassword, cookieHash, cookieInsertedThisRequest: true, isScriptViewer: false, }; } // This gets called after createNewUserCookie and from websiteResponder. If the // Viewer's sessionIdentifierType is COOKIE_ID then the cookieID is used as the // session identifier; otherwise, a new ID is created for the session. async function setNewSession( viewer: Viewer, calendarQuery: CalendarQuery, initialLastUpdate: number, ): Promise { if (viewer.sessionIdentifierType !== sessionIdentifierTypes.COOKIE_ID) { const [sessionID] = await createIDs('sessions', 1); viewer.setSessionID(sessionID); } await createSession(viewer, calendarQuery, initialLastUpdate); } async function updateCookie(viewer: Viewer) { const time = Date.now(); const { cookieID, cookieHash, cookiePassword } = viewer; const updateObj: { [string]: string | number } = {}; updateObj.last_used = time; if (isBcryptHash(cookieHash)) { updateObj.hash = getCookieHash(cookiePassword); } const query = SQL` UPDATE cookies SET ${updateObj} WHERE id = ${cookieID} `; await dbQuery(query); } function addCookieToJSONResponse( viewer: Viewer, res: $Response, result: Object, expectCookieInvalidation: boolean, ) { if (expectCookieInvalidation) { viewer.cookieInvalidated = false; } if (!viewer.getData().cookieInsertedThisRequest) { handleAsyncPromise(updateCookie(viewer)); } if (viewer.sessionChanged) { addSessionChangeInfoToResult(viewer, res, result); } } function addCookieToHomeResponse( req: $Request, res: $Response, appURLFacts: AppURLFacts, ) { const { user, anonymous } = req.cookies; if (user) { res.cookie(cookieTypes.USER, user, getCookieOptions(appURLFacts)); } if (anonymous) { res.cookie(cookieTypes.ANONYMOUS, anonymous, getCookieOptions(appURLFacts)); } } function getCookieOptions(appURLFacts: AppURLFacts) { const { baseDomain, basePath, https } = appURLFacts; const domainAsURL = new url.URL(baseDomain); return { domain: domainAsURL.hostname, path: basePath, httpOnly: false, secure: https, maxAge: cookieLifetime, sameSite: 'Strict', }; } async function setCookieSignedIdentityKeysBlob( cookieID: string, signedIdentityKeysBlob: SignedIdentityKeysBlob, ) { const signedIdentityKeysStr = JSON.stringify(signedIdentityKeysBlob); const query = SQL` UPDATE cookies SET signed_identity_keys = ${signedIdentityKeysStr} WHERE id = ${cookieID} `; await dbQuery(query); } // Returns `true` if row with `id = cookieID` exists AND // `signed_identity_keys` is `NULL`. Otherwise, returns `false`. async function isCookieMissingSignedIdentityKeysBlob( cookieID: string, ): Promise { const query = SQL` SELECT signed_identity_keys FROM cookies WHERE id = ${cookieID} `; const [queryResult] = await dbQuery(query); return ( queryResult.length === 1 && queryResult[0].signed_identity_keys === null ); } async function isCookieMissingOlmNotificationsSession( viewer: Viewer, ): Promise { const isStaffOrDev = isStaff(viewer.userID) || isDev; if ( !viewer.platformDetails || (viewer.platformDetails.platform !== 'ios' && viewer.platformDetails.platform !== 'android' && !(viewer.platformDetails.platform === 'web' && isStaffOrDev)) || !hasMinCodeVersion(viewer.platformDetails, { native: 222, web: 43, }) ) { return false; } const query = SQL` SELECT COUNT(*) AS count FROM olm_sessions WHERE cookie_id = ${viewer.cookieID} AND is_content = FALSE `; const [queryResult] = await dbQuery(query); return queryResult[0].count === 0; } async function setCookiePlatform( viewer: Viewer, platform: Platform, ): Promise { const newPlatformDetails = { ...viewer.platformDetails, platform }; viewer.setPlatformDetails(newPlatformDetails); const query = SQL` UPDATE cookies SET platform = ${platform} WHERE id = ${viewer.cookieID} `; await dbQuery(query); } async function setCookiePlatformDetails( viewer: Viewer, platformDetails: PlatformDetails, ): Promise { viewer.setPlatformDetails(platformDetails); const { platform, ...versions } = platformDetails; const versionsString = Object.keys(versions).length > 0 ? JSON.stringify(versions) : null; const query = SQL` UPDATE cookies SET platform = ${platform}, versions = ${versionsString} WHERE id = ${viewer.cookieID} `; await dbQuery(query); } export { fetchViewerForJSONRequest, fetchViewerForSocket, createNewAnonymousCookie, createNewUserCookie, setNewSession, updateCookie, addCookieToJSONResponse, addCookieToHomeResponse, setCookieSignedIdentityKeysBlob, isCookieMissingSignedIdentityKeysBlob, setCookiePlatform, setCookiePlatformDetails, isCookieMissingOlmNotificationsSession, }; diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index 773858d27..eebef70e8 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,885 +1,886 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce.js'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import WebSocket from 'ws'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { isStaff } from 'lib/shared/staff-utils.js'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { Shape } from 'lib/types/core.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; import type { RawEntryInfo } from 'lib/types/entry-types.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; import { serverRequestTypes } from 'lib/types/request-types.js'; import { sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types.js'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; import type { RawThreadInfos } from 'lib/types/thread-types.js'; import type { UserInfo, CurrentUserInfo } from 'lib/types/user-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; import { RedisSubscriber } from './redis.js'; import { clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers.js'; import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; import { newEntryQueryInputValidator, verifyCalendarQueryThreadIDs, } from '../responders/entry-responders.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { fetchViewerForSocket, updateCookie, isCookieMissingSignedIdentityKeysBlob, isCookieMissingOlmNotificationsSession, createNewAnonymousCookie, } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import type { AnonymousViewerData } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; import { compressMessage } from '../utils/compress.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { checkInputValidator, checkClientSupported, policiesValidator, validateOutput, } from '../utils/validation-utils.js'; const clientSocketMessageInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: newEntryQueryInputValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.maybe(t.Object), }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; const minVersionsForCompression = { native: 265, web: 30, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, ) => { invariant(typeof messageString === 'string', 'message should be string'); let clientSocketMessage: ?ClientSocketMessage; try { this.resetTimeout(); const messageObject = JSON.parse(messageString); clientSocketMessage = checkInputValidator( clientSocketMessageInputValidator, messageObject, ); if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessage, ); } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessage, ); await policiesValidator(viewer, baseLegalPolicies); const serverResponses = await this.handleClientSocketMessage( clientSocketMessage, ); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { handleAsyncPromise(updateCookie(viewer)); } for (const response of serverResponses) { // Normally it's an anti-pattern to await in sequence like this. But in // this case, we have a requirement that this array of serverResponses // is delivered in order. See here: // https://github.com/CommE2E/comm/blob/101eb34481deb49c609bfd2c785f375886e52666/keyserver/src/socket/socket.js#L566-L568 await this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; const responseTo = clientSocketMessage ? clientSocketMessage.id : null; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); await this.sendMessage(errorMessage); return; } invariant(clientSocketMessage, 'should be set'); const responseTo = clientSocketMessage.id; if (error.message === 'socket_deauthorized') { invariant(this.viewer, 'should be set'); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: this.viewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const anonymousViewerDataPromise: Promise = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); const deleteCookiePromise = deleteCookie(viewer.cookieID); const [anonymousViewerData] = await Promise.all([ anonymousViewerDataPromise, deleteCookiePromise, ]); // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: anonViewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = async (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState !== 1) { return; } const { viewer } = this; const validatedMessage = validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, message, ); const stringMessage = JSON.stringify(validatedMessage); if ( !viewer?.platformDetails || !hasMinCodeVersion(viewer.platformDetails, minVersionsForCompression) || !isStaff(viewer.id) ) { this.ws.send(stringMessage); return; } const compressionResult = await compressMessage(stringMessage); if (this.ws.readyState !== 1) { return; } if (!compressionResult.compressed) { this.ws.send(stringMessage); return; } const compressedMessage = { type: serverSocketMessageTypes.COMPRESSED_MESSAGE, payload: compressionResult.result, }; const validatedCompressedMessage = validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, compressedMessage, ); const stringCompressedMessage = JSON.stringify(validatedCompressedMessage); this.ws.send(stringCompressedMessage); }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { - const resultPromise = (async () => { + const resultPromise: Promise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); - const timeoutPromise = (async () => { + const timeoutPromise: Promise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses: Array = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors: { [string]: null } = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const [fetchMessagesResult, { serverRequests, activityUpdateResult }] = await Promise.all([ fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; const isCookieMissingSignedIdentityKeysBlobPromise = isCookieMissingSignedIdentityKeysBlob(viewer.cookieID); const isCookieMissingOlmNotificationsSessionPromise = isCookieMissingOlmNotificationsSession(viewer); if (!sessionInitializationResult.sessionContinued) { const promises: { +[string]: Promise } = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), ]), ); // We have a type error here because Flow doesn't know spec.hashKey const castPromises: { +threadInfos: Promise, +currentUserInfo: Promise, +entryInfos: Promise<$ReadOnlyArray>, +userInfos: Promise<$ReadOnlyArray>, } = (promises: any); const results = await promiseAll(castPromises); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: results.threadInfos, currentUserInfo: results.currentUserInfo, rawEntryInfos: results.entryInfos, userInfos: results.userInfos, updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters, // sessionIdentifierTypes.BODY_SESSION_ID but the session // is unspecified or expired, // it will set a new sessionID and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; - const promises = {}; - promises.deleteExpiredUpdates = deleteUpdatesBeforeTimeTargetingSession( - viewer, - oldUpdatesCurrentAsOf, - ); - promises.fetchUpdateResult = fetchUpdateInfos( + const deleteExpiredUpdatesPromise = + deleteUpdatesBeforeTimeTargetingSession(viewer, oldUpdatesCurrentAsOf); + const fetchUpdateResultPromise = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); - promises.sessionUpdate = commitSessionUpdate(viewer, sessionUpdate); - const { fetchUpdateResult } = await promiseAll(promises); + const sessionUpdatePromise = commitSessionUpdate(viewer, sessionUpdate); + const [fetchUpdateResult] = await Promise.all([ + fetchUpdateResultPromise, + deleteExpiredUpdatesPromise, + sessionUpdatePromise, + ]); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } const [signedIdentityKeysBlobMissing, olmNotificationsSessionMissing] = await Promise.all([ isCookieMissingSignedIdentityKeysBlobPromise, isCookieMissingOlmNotificationsSessionPromise, ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({ type: serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, }); } if (olmNotificationsSessionMissing) { serverRequests.push({ type: serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; await policiesValidator(viewer, responder.requiredPolicies); const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called resetTimeout = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); debouncedAfterActivity = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Shape) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } get stateCheckCanStart() { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { this.initiateStateCheck(); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState(viewer, { status: 'state_check', }); invariant(checkStateRequest, 'should be set'); await this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection }; diff --git a/keyserver/src/updaters/thread-permission-updaters.js b/keyserver/src/updaters/thread-permission-updaters.js index 6fbac2085..a1fec8055 100644 --- a/keyserver/src/updaters/thread-permission-updaters.js +++ b/keyserver/src/updaters/thread-permission-updaters.js @@ -1,1399 +1,1405 @@ // @flow import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; import bots from 'lib/facts/bots.js'; import genesis from 'lib/facts/genesis.js'; import { specialRoles } from 'lib/permissions/special-roles.js'; import { makePermissionsBlob, makePermissionsForChildrenBlob, getRoleForPermissions, } from 'lib/permissions/thread-permissions.js'; import type { CalendarQuery } from 'lib/types/entry-types.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import type { ThreadPermissionsBlob, ThreadRolePermissionsBlob, } from 'lib/types/thread-permission-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { type ThreadType, assertThreadType, } from 'lib/types/thread-types-enum.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { type ServerUpdateInfo, type CreateUpdatesResult, type UpdateData, } from 'lib/types/update-types.js'; import { pushAll } from 'lib/utils/array.js'; import { ServerError } from 'lib/utils/errors.js'; import { updateChangedUndirectedRelationships } from './relationship-updaters.js'; import { createUpdates, type UpdatesForCurrentSession, } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchServerThreadInfos, rawThreadInfosFromServerThreadInfos, } from '../fetchers/thread-fetchers.js'; import { rescindPushNotifs } from '../push/rescind.js'; import { createScriptViewer } from '../session/scripts.js'; import type { Viewer } from '../session/viewer.js'; import { updateRoles } from '../updaters/role-updaters.js'; import DepthQueue from '../utils/depth-queue.js'; import RelationshipChangeset from '../utils/relationship-changeset.js'; export type MembershipRowToSave = { +operation: 'save', +intent: 'join' | 'leave' | 'none', +userID: string, +threadID: string, +userNeedsFullThreadDetails: boolean, +permissions: ?ThreadPermissionsBlob, +permissionsForChildren: ?ThreadPermissionsBlob, // null role represents by "0" +role: string, +oldRole: string, +unread?: boolean, }; type MembershipRowToDelete = { +operation: 'delete', +intent: 'join' | 'leave' | 'none', +userID: string, +threadID: string, +oldRole: string, }; export type MembershipRow = MembershipRowToSave | MembershipRowToDelete; export type MembershipChangeset = { +membershipRows: MembershipRow[], +relationshipChangeset: RelationshipChangeset, }; // 0 role means to remove the user from the thread // null role means to set the user to the default role // string role means to set the user to the role with that ID // -1 role means to set the user as a "ghost" (former member) type ChangeRoleOptions = { +setNewMembersToUnread?: boolean, +forcePermissionRecalculation?: boolean, }; type ChangeRoleMemberInfo = { permissionsFromParent?: ?ThreadPermissionsBlob, memberOfContainingThread?: boolean, }; async function changeRole( threadID: string, userIDs: $ReadOnlyArray, role: string | -1 | 0 | null, options?: ChangeRoleOptions, ): Promise { const intent = role === -1 || role === 0 ? 'leave' : 'join'; const setNewMembersToUnread = options?.setNewMembersToUnread && intent === 'join'; const forcePermissionRecalculation = options?.forcePermissionRecalculation; if (userIDs.length === 0) { return { membershipRows: [], relationshipChangeset: new RelationshipChangeset(), }; } const membershipQuery = SQL` SELECT user, role, permissions, permissions_for_children FROM memberships WHERE thread = ${threadID} `; const parentMembershipQuery = SQL` SELECT pm.user, pm.permissions_for_children AS permissions_from_parent FROM threads t INNER JOIN memberships pm ON pm.thread = t.parent_thread_id WHERE t.id = ${threadID} AND (pm.user IN (${userIDs}) OR t.parent_thread_id != ${genesis.id}) `; const containingMembershipQuery = SQL` SELECT cm.user, cm.role AS containing_role FROM threads t INNER JOIN memberships cm ON cm.thread = t.containing_thread_id WHERE t.id = ${threadID} AND cm.user IN (${userIDs}) `; + const containingMembershipPromise: Promise< + $ReadOnlyArray<{ + +user: number, + +containing_role: number, + }>, + > = (async () => { + if (intent === 'leave') { + // Membership in the container only needs to be checked for members + return []; + } + const [result] = await dbQuery(containingMembershipQuery); + return result; + })(); const [ [membershipResults], [parentMembershipResults], containingMembershipResults, roleThreadResult, ] = await Promise.all([ dbQuery(membershipQuery), dbQuery(parentMembershipQuery), - (async () => { - if (intent === 'leave') { - // Membership in the container only needs to be checked for members - return []; - } - const [result] = await dbQuery(containingMembershipQuery); - return result; - })(), + containingMembershipPromise, changeRoleThreadQuery(threadID, role), ]); const { roleColumnValue: intendedRole, threadType, parentThreadID, hasContainingThreadID, rolePermissions: intendedRolePermissions, depth, } = roleThreadResult; const existingMembershipInfo = new Map(); for (const row of membershipResults) { const userID = row.user.toString(); existingMembershipInfo.set(userID, { oldRole: row.role.toString(), oldPermissions: JSON.parse(row.permissions), oldPermissionsForChildren: JSON.parse(row.permissions_for_children), }); } const ancestorMembershipInfo: Map = new Map(); for (const row of parentMembershipResults) { const userID = row.user.toString(); if (!userIDs.includes(userID)) { continue; } ancestorMembershipInfo.set(userID, { permissionsFromParent: JSON.parse(row.permissions_from_parent), }); } for (const row of containingMembershipResults) { const userID = row.user.toString(); const ancestorMembership = ancestorMembershipInfo.get(userID); const memberOfContainingThread = row.containing_role > 0; if (ancestorMembership) { ancestorMembership.memberOfContainingThread = memberOfContainingThread; } else { ancestorMembershipInfo.set(userID, { memberOfContainingThread, }); } } const relationshipChangeset = new RelationshipChangeset(); const existingMemberIDs = [...existingMembershipInfo.keys()]; if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const parentMemberIDs = parentMembershipResults.map(row => row.user.toString(), ); if (parentThreadID && parentThreadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(parentMemberIDs); } const membershipRows: Array = []; const toUpdateDescendants = new Map(); for (const userID of userIDs) { const existingMembership = existingMembershipInfo.get(userID); const oldRole = existingMembership?.oldRole ?? '-1'; const oldPermissions = existingMembership?.oldPermissions ?? null; const oldPermissionsForChildren = existingMembership?.oldPermissionsForChildren ?? null; if ( existingMembership && oldRole === intendedRole && !forcePermissionRecalculation ) { // If the old role is the same as the new one, we have nothing to update continue; } else if (Number(oldRole) > 0 && role === null) { // In the case where we're just trying to add somebody to a thread, if // they already have a role with a nonzero role then we don't need to do // anything continue; } let permissionsFromParent = null; let memberOfContainingThread = false; const ancestorMembership = ancestorMembershipInfo.get(userID); if (ancestorMembership) { permissionsFromParent = ancestorMembership.permissionsFromParent; memberOfContainingThread = ancestorMembership.memberOfContainingThread; } if (!hasContainingThreadID) { memberOfContainingThread = true; } const rolePermissions = memberOfContainingThread ? intendedRolePermissions : null; const targetRole = memberOfContainingThread ? intendedRole : '-1'; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userBecameMember = Number(oldRole) <= 0 && Number(newRole) > 0; const userLostMembership = Number(oldRole) > 0 && Number(newRole) <= 0; if ( (intent === 'join' && Number(newRole) <= 0) || (intent === 'leave' && Number(newRole) > 0) ) { throw new ServerError('invalid_parameters'); } else if (intendedRole !== newRole) { console.warn( `changeRole called for role=${intendedRole}, but ended up setting ` + `role=${newRole} for userID ${userID} and threadID ${threadID}, ` + 'probably because KNOW_OF permission was unexpectedly present or ' + 'missing', ); } if ( existingMembership && _isEqual(permissions)(oldPermissions) && oldRole === newRole ) { // This thread and all of its descendants need no updates for this user, // since the corresponding memberships row is unchanged by this operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent, userID, threadID, userNeedsFullThreadDetails: userBecameMember, permissions, permissionsForChildren, role: newRole, oldRole, unread: userBecameMember && setNewMembersToUnread, }); } else { membershipRows.push({ operation: 'delete', intent, userID, threadID, oldRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { relationshipChangeset.setRelationshipsNeeded(userID, existingMemberIDs); } if ( userLostMembership || !_isEqual(permissionsForChildren)(oldPermissionsForChildren) ) { toUpdateDescendants.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (toUpdateDescendants.size > 0) { const { membershipRows: descendantMembershipRows, relationshipChangeset: descendantRelationshipChangeset, } = await updateDescendantPermissions({ threadID, depth, changesByUser: toUpdateDescendants, }); pushAll(membershipRows, descendantMembershipRows); relationshipChangeset.addAll(descendantRelationshipChangeset); } return { membershipRows, relationshipChangeset }; } type RoleThreadResult = { +roleColumnValue: string, +depth: number, +threadType: ThreadType, +parentThreadID: ?string, +hasContainingThreadID: boolean, +rolePermissions: ?ThreadRolePermissionsBlob, }; async function changeRoleThreadQuery( threadID: string, role: string | -1 | 0 | null, ): Promise { if (role === 0 || role === -1) { const query = SQL` SELECT type, depth, parent_thread_id, containing_thread_id FROM threads WHERE id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: role.toString(), depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: null, }; } else if (role !== null) { const query = SQL` SELECT t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions FROM threads t INNER JOIN roles r ON r.thread = t.id AND r.id = ${role} WHERE t.id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: role, depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: JSON.parse(row.permissions), }; } else { const query = SQL` SELECT t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions, r.id FROM threads t INNER JOIN roles r ON r.thread = t.id AND r.special_role = ${specialRoles.DEFAULT_ROLE} WHERE t.id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: row.id.toString(), depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: JSON.parse(row.permissions), }; } } type ChangedAncestor = { +threadID: string, +depth: number, +changesByUser: Map, }; type AncestorChanges = { +userIsMember: boolean, +permissionsForChildren: ?ThreadPermissionsBlob, }; async function updateDescendantPermissions( initialChangedAncestor: ChangedAncestor, ): Promise { const membershipRows: Array = []; const relationshipChangeset = new RelationshipChangeset(); const initialDescendants = await fetchDescendantsForUpdate([ initialChangedAncestor, ]); const depthQueue = new DepthQueue( getDescendantDepth, getDescendantKey, mergeDescendants, ); depthQueue.addInfos(initialDescendants); let descendants; while ((descendants = depthQueue.getNextDepth())) { const descendantsAsAncestors = []; for (const descendant of descendants) { const { threadID, threadType, depth, users } = descendant; const existingMembers = [...users.entries()]; const existingMemberIDs = existingMembers .filter(([, { curRole }]) => curRole) .map(([userID]) => userID); if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const usersForNextLayer = new Map(); for (const [userID, user] of users) { const { curRolePermissions, curPermissionsFromParent, curMemberOfContainingThread, nextMemberOfContainingThread, nextPermissionsFromParent, potentiallyNeedsUpdate, } = user; const existingMembership = !!user.curRole; const curRole = user.curRole ?? '-1'; const curPermissions = user.curPermissions ?? null; const curPermissionsForChildren = user.curPermissionsForChildren ?? null; if (!potentiallyNeedsUpdate) { continue; } const permissionsFromParent = nextPermissionsFromParent === undefined ? curPermissionsFromParent : nextPermissionsFromParent; const memberOfContainingThread = nextMemberOfContainingThread === undefined ? curMemberOfContainingThread : nextMemberOfContainingThread; const targetRole = memberOfContainingThread ? curRole : '-1'; const rolePermissions = memberOfContainingThread ? curRolePermissions : null; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userLostMembership = Number(curRole) > 0 && Number(newRole) <= 0; if (_isEqual(permissions)(curPermissions) && curRole === newRole) { // This thread and all of its descendants need no updates for this // user, since the corresponding memberships row is unchanged by this // operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent: 'none', userID, threadID, userNeedsFullThreadDetails: false, permissions, permissionsForChildren, role: newRole, oldRole: curRole, }); } else { membershipRows.push({ operation: 'delete', intent: 'none', userID, threadID, oldRole: curRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { // If there was no membership row before, and we are creating one, // we'll need to make sure the new member has a relationship row with // each existing member. We expect that whoever called us already // generated memberships row for the new members, will will lead // saveMemberships to generate relationships rows between those new // users. relationshipChangeset.setRelationshipsNeeded( userID, existingMemberIDs, ); } if ( userLostMembership || !_isEqual(permissionsForChildren)(curPermissionsForChildren) ) { usersForNextLayer.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (usersForNextLayer.size > 0) { descendantsAsAncestors.push({ threadID, depth, changesByUser: usersForNextLayer, }); } } const nextDescendants = await fetchDescendantsForUpdate( descendantsAsAncestors, ); depthQueue.addInfos(nextDescendants); } return { membershipRows, relationshipChangeset }; } type DescendantUserInfo = $Shape<{ curRole?: string, curRolePermissions?: ?ThreadRolePermissionsBlob, curPermissions?: ?ThreadPermissionsBlob, curPermissionsForChildren?: ?ThreadPermissionsBlob, curPermissionsFromParent?: ?ThreadPermissionsBlob, curMemberOfContainingThread?: boolean, nextPermissionsFromParent?: ?ThreadPermissionsBlob, nextMemberOfContainingThread?: boolean, potentiallyNeedsUpdate?: boolean, }>; type DescendantInfo = { +threadID: string, +parentThreadID: string, +containingThreadID: string, +threadType: ThreadType, +depth: number, +users: Map, }; const fetchDescendantsBatchSize = 10; async function fetchDescendantsForUpdate( ancestors: $ReadOnlyArray, ): Promise { const threadIDs = ancestors.map(ancestor => ancestor.threadID); const rows: Array<{ +id: number, +user: number, +type: number, +depth: number, +parent_thread_id: number, +containing_thread_id: number, +role_permissions: string, +permissions: string, +permissions_for_children: string, +role: number, +permissions_from_parent: string | null, +containing_role: ?number, }> = []; while (threadIDs.length > 0) { const batch = threadIDs.splice(0, fetchDescendantsBatchSize); const query = SQL` SELECT t.id, m.user, t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions AS role_permissions, m.permissions, m.permissions_for_children, m.role, pm.permissions_for_children AS permissions_from_parent, cm.role AS containing_role FROM threads t INNER JOIN memberships m ON m.thread = t.id LEFT JOIN memberships pm ON pm.thread = t.parent_thread_id AND pm.user = m.user LEFT JOIN memberships cm ON cm.thread = t.containing_thread_id AND cm.user = m.user LEFT JOIN roles r ON r.id = m.role WHERE t.parent_thread_id IN (${batch}) OR t.containing_thread_id IN (${batch}) `; const [results] = await dbQuery(query); pushAll(rows, results); } const descendantThreadInfos: Map = new Map(); for (const row of rows) { const descendantThreadID = row.id.toString(); if (!descendantThreadInfos.has(descendantThreadID)) { descendantThreadInfos.set(descendantThreadID, { threadID: descendantThreadID, parentThreadID: row.parent_thread_id.toString(), containingThreadID: row.containing_thread_id.toString(), threadType: assertThreadType(row.type), depth: row.depth, users: new Map(), }); } const descendantThreadInfo = descendantThreadInfos.get(descendantThreadID); invariant( descendantThreadInfo, `value should exist for key ${descendantThreadID}`, ); const userID = row.user.toString(); descendantThreadInfo.users.set(userID, { curRole: row.role.toString(), curRolePermissions: JSON.parse(row.role_permissions), curPermissions: JSON.parse(row.permissions), curPermissionsForChildren: JSON.parse(row.permissions_for_children), curPermissionsFromParent: row.permissions_from_parent ? JSON.parse(row.permissions_from_parent) : null, curMemberOfContainingThread: !!row.containing_role && row.containing_role > 0, }); } for (const ancestor of ancestors) { const { threadID, changesByUser } = ancestor; for (const [userID, changes] of changesByUser) { for (const descendantThreadInfo of descendantThreadInfos.values()) { const { users, parentThreadID, containingThreadID } = descendantThreadInfo; if (threadID !== parentThreadID && threadID !== containingThreadID) { continue; } let user: ?DescendantUserInfo = users.get(userID); if (!user) { user = ({}: DescendantUserInfo); users.set(userID, user); } if (threadID === parentThreadID) { user.nextPermissionsFromParent = changes.permissionsForChildren; user.potentiallyNeedsUpdate = true; } if (threadID === containingThreadID) { user.nextMemberOfContainingThread = changes.userIsMember; if (!user.nextMemberOfContainingThread) { user.potentiallyNeedsUpdate = true; } } } } } return [...descendantThreadInfos.values()]; } function getDescendantDepth(descendant: DescendantInfo): number { return descendant.depth; } function getDescendantKey(descendant: DescendantInfo): string { return descendant.threadID; } function mergeDescendants( a: DescendantInfo, b: DescendantInfo, ): DescendantInfo { const { users: usersA, ...restA } = a; const { users: usersB, ...restB } = b; if (!_isEqual(restA)(restB)) { console.warn( `inconsistent descendantInfos ${JSON.stringify(restA)}, ` + JSON.stringify(restB), ); throw new ServerError('internal_error'); } const newUsers = new Map(usersA); for (const [userID, userFromB] of usersB) { const userFromA = newUsers.get(userID); if (!userFromA) { newUsers.set(userID, userFromB); } else { newUsers.set(userID, { ...userFromA, ...userFromB }); } } return { ...a, users: newUsers }; } type RecalculatePermissionsMemberInfo = { role?: ?string, permissions?: ?ThreadPermissionsBlob, permissionsForChildren?: ?ThreadPermissionsBlob, rolePermissions?: ?ThreadRolePermissionsBlob, memberOfContainingThread?: boolean, permissionsFromParent?: ?ThreadPermissionsBlob, }; async function recalculateThreadPermissions( threadID: string, ): Promise { const threadQuery = SQL` SELECT type, depth, parent_thread_id, containing_thread_id FROM threads WHERE id = ${threadID} `; const membershipQuery = SQL` SELECT m.user, m.role, m.permissions, m.permissions_for_children, r.permissions AS role_permissions, cm.role AS containing_role FROM threads t INNER JOIN memberships m ON m.thread = t.id LEFT JOIN roles r ON r.id = m.role LEFT JOIN memberships cm ON cm.user = m.user AND cm.thread = t.containing_thread_id WHERE t.id = ${threadID} `; const parentMembershipQuery = SQL` SELECT pm.user, pm.permissions_for_children AS permissions_from_parent FROM threads t INNER JOIN memberships pm ON pm.thread = t.parent_thread_id WHERE t.id = ${threadID} `; const [[threadResults], [membershipResults], [parentMembershipResults]] = await Promise.all([ dbQuery(threadQuery), dbQuery(membershipQuery), dbQuery(parentMembershipQuery), ]); if (threadResults.length !== 1) { throw new ServerError('internal_error'); } const [threadResult] = threadResults; const threadType = assertThreadType(threadResult.type); const depth = threadResult.depth; const hasContainingThreadID = threadResult.containing_thread_id !== null; const parentThreadID = threadResult.parent_thread_id?.toString(); const membershipInfo: Map = new Map(); for (const row of membershipResults) { const userID = row.user.toString(); membershipInfo.set(userID, { role: row.role.toString(), permissions: JSON.parse(row.permissions), permissionsForChildren: JSON.parse(row.permissions_for_children), rolePermissions: JSON.parse(row.role_permissions), memberOfContainingThread: !!( row.containing_role && row.containing_role > 0 ), }); } for (const row of parentMembershipResults) { const userID = row.user.toString(); const permissionsFromParent = JSON.parse(row.permissions_from_parent); const membership = membershipInfo.get(userID); if (membership) { membership.permissionsFromParent = permissionsFromParent; } else { membershipInfo.set(userID, { permissionsFromParent: permissionsFromParent, }); } } const relationshipChangeset = new RelationshipChangeset(); const existingMemberIDs = membershipResults.map(row => row.user.toString()); if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const parentMemberIDs = parentMembershipResults.map(row => row.user.toString(), ); if (parentThreadID && parentThreadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(parentMemberIDs); } const membershipRows: Array = []; const toUpdateDescendants = new Map(); for (const [userID, membership] of membershipInfo) { const { rolePermissions: intendedRolePermissions, permissionsFromParent } = membership; const oldPermissions = membership?.permissions ?? null; const oldPermissionsForChildren = membership?.permissionsForChildren ?? null; const existingMembership = membership.role !== undefined; const oldRole = membership.role ?? '-1'; const memberOfContainingThread = hasContainingThreadID ? !!membership.memberOfContainingThread : true; const targetRole = memberOfContainingThread ? oldRole : '-1'; const rolePermissions = memberOfContainingThread ? intendedRolePermissions : null; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userLostMembership = Number(oldRole) > 0 && Number(newRole) <= 0; if (_isEqual(permissions)(oldPermissions) && oldRole === newRole) { // This thread and all of its descendants need no updates for this user, // since the corresponding memberships row is unchanged by this operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent: 'none', userID, threadID, userNeedsFullThreadDetails: false, permissions, permissionsForChildren, role: newRole, oldRole, }); } else { membershipRows.push({ operation: 'delete', intent: 'none', userID, threadID, oldRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { // If there was no membership row before, and we are creating one, // we'll need to make sure the new member has a relationship row with // each existing member. We handle guaranteeing that new members have // relationship rows with each other in saveMemberships. relationshipChangeset.setRelationshipsNeeded(userID, existingMemberIDs); } if ( userLostMembership || !_isEqual(permissionsForChildren)(oldPermissionsForChildren) ) { toUpdateDescendants.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (toUpdateDescendants.size > 0) { const { membershipRows: descendantMembershipRows, relationshipChangeset: descendantRelationshipChangeset, } = await updateDescendantPermissions({ threadID, depth, changesByUser: toUpdateDescendants, }); pushAll(membershipRows, descendantMembershipRows); relationshipChangeset.addAll(descendantRelationshipChangeset); } return { membershipRows, relationshipChangeset }; } const defaultSubscriptionString = JSON.stringify({ home: false, pushNotifs: false, }); const joinSubscriptionString = JSON.stringify({ home: true, pushNotifs: true }); const membershipInsertBatchSize = 50; const visibleExtractString = `$.${threadPermissions.VISIBLE}.value`; async function saveMemberships({ toSave, updateMembershipsLastMessage, }: { toSave: $ReadOnlyArray, updateMembershipsLastMessage: boolean, }) { if (toSave.length === 0) { return; } const time = Date.now(); const insertRows = []; for (const rowToSave of toSave) { insertRows.push([ rowToSave.userID, rowToSave.threadID, rowToSave.role, time, rowToSave.intent === 'join' ? joinSubscriptionString : defaultSubscriptionString, rowToSave.permissions ? JSON.stringify(rowToSave.permissions) : null, rowToSave.permissionsForChildren ? JSON.stringify(rowToSave.permissionsForChildren) : null, rowToSave.unread ? 1 : 0, 0, ]); } // Logic below will only update an existing membership row's `subscription` // column if the user is either joining or leaving the thread. That means // there's no way to use this function to update a user's subscription without // also making them join or leave the thread. The reason we do this is because // we need to specify a value for `subscription` here, as it's a non-null // column and this is an INSERT, but we don't want to require people to have // to know the current `subscription` when they're just using this function to // update the permissions of an existing membership row. while (insertRows.length > 0) { const batch = insertRows.splice(0, membershipInsertBatchSize); const query = SQL` INSERT INTO memberships (user, thread, role, creation_time, subscription, permissions, permissions_for_children, last_message, last_read_message) VALUES ${batch} ON DUPLICATE KEY UPDATE subscription = IF( (role <= 0 AND VALUE(role) > 0) OR (role > 0 AND VALUE(role) <= 0), VALUE(subscription), subscription ), role = VALUE(role), permissions = VALUE(permissions), permissions_for_children = VALUE(permissions_for_children) `; await dbQuery(query); } if (!updateMembershipsLastMessage) { return; } const joinRows = toSave .filter(row => row.intent === 'join') .map(row => [row.userID, row.threadID, row.unread]); if (joinRows.length === 0) { return; } const joinedUserThreadPairs = joinRows.map(([user, thread]) => [ user, thread, ]); const unreadUserThreadPairs = joinRows .filter(([, , unread]) => !!unread) .map(([user, thread]) => [user, thread]); let lastReadMessageExpression; if (unreadUserThreadPairs.length === 0) { lastReadMessageExpression = SQL` GREATEST(COALESCE(all_users_query.message, 0), COALESCE(last_subthread_message_for_user_query.message, 0)) `; } else { lastReadMessageExpression = SQL` (CASE WHEN ((mm.user, mm.thread) in (${unreadUserThreadPairs})) THEN 0 ELSE GREATEST(COALESCE(all_users_query.message, 0), COALESCE(last_subthread_message_for_user_query.message, 0)) END) `; } // We join two subqueries with the memberships table: // - the first subquery calculates the oldest non-CREATE_SUB_THREAD // message, which is the same for all users // - the second subquery calculates the oldest CREATE_SUB_THREAD messages, // which can be different for each user because of visibility permissions // Then we set the `last_message` column to the greater value of the two. // For `last_read_message` we do the same but only if the user should have // the "unread" status set for this thread. const query = SQL` UPDATE memberships mm LEFT JOIN ( SELECT thread, MAX(id) AS message FROM messages WHERE type != ${messageTypes.CREATE_SUB_THREAD} GROUP BY thread ) all_users_query ON mm.thread = all_users_query.thread LEFT JOIN ( SELECT m.thread, stm.user, MAX(m.id) AS message FROM messages m LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content WHERE JSON_EXTRACT(stm.permissions, ${visibleExtractString}) IS TRUE GROUP BY m.thread, stm.user ) last_subthread_message_for_user_query ON mm.thread = last_subthread_message_for_user_query.thread AND mm.user = last_subthread_message_for_user_query.user SET mm.last_message = GREATEST(COALESCE(all_users_query.message, 0), COALESCE(last_subthread_message_for_user_query.message, 0)), mm.last_read_message = `; query.append(lastReadMessageExpression); query.append(SQL`WHERE (mm.user, mm.thread) IN (${joinedUserThreadPairs});`); await dbQuery(query); } async function deleteMemberships( toDelete: $ReadOnlyArray, ) { if (toDelete.length === 0) { return; } const time = Date.now(); const insertRows = toDelete.map(rowToDelete => [ rowToDelete.userID, rowToDelete.threadID, -1, time, defaultSubscriptionString, null, null, 0, 0, ]); while (insertRows.length > 0) { const batch = insertRows.splice(0, membershipInsertBatchSize); const query = SQL` INSERT INTO memberships (user, thread, role, creation_time, subscription, permissions, permissions_for_children, last_message, last_read_message) VALUES ${batch} ON DUPLICATE KEY UPDATE role = -1, permissions = NULL, permissions_for_children = NULL, subscription = ${defaultSubscriptionString}, last_message = 0, last_read_message = 0 `; await dbQuery(query); } } const emptyCommitMembershipChangesetConfig = Object.freeze({}); // Specify non-empty changedThreadIDs to force updates to be generated for those // threads, presumably for reasons not covered in the changeset. calendarQuery // only needs to be specified if a JOIN_THREAD update will be generated for the // viewer, in which case it's necessary for knowing the set of entries to fetch. type ChangesetCommitResult = { ...CreateUpdatesResult, }; async function commitMembershipChangeset( viewer: Viewer, changeset: MembershipChangeset, { changedThreadIDs = new Set(), calendarQuery, updatesForCurrentSession = 'return', updateMembershipsLastMessage = false, }: { +changedThreadIDs?: Set, +calendarQuery?: ?CalendarQuery, +updatesForCurrentSession?: UpdatesForCurrentSession, +updateMembershipsLastMessage?: boolean, } = emptyCommitMembershipChangesetConfig, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const { membershipRows, relationshipChangeset } = changeset; const membershipRowMap = new Map(); for (const row of membershipRows) { const { userID, threadID } = row; changedThreadIDs.add(threadID); const pairString = `${userID}|${threadID}`; const existing = membershipRowMap.get(pairString); invariant( !existing || existing.intent === 'none' || row.intent === 'none', `multiple intents provided for ${pairString}`, ); if (!existing || existing.intent === 'none') { membershipRowMap.set(pairString, row); } } const toSave = [], toDelete = [], toRescindPushNotifs = []; for (const row of membershipRowMap.values()) { if ( row.operation === 'delete' || (row.operation === 'save' && Number(row.role) <= 0) ) { const { userID, threadID } = row; toRescindPushNotifs.push({ userID, threadID }); } if (row.operation === 'delete') { toDelete.push(row); } else { toSave.push(row); } } const threadsToSavedUsers = new Map(); for (const row of membershipRowMap.values()) { const { userID, threadID } = row; let savedUsers = threadsToSavedUsers.get(threadID); if (!savedUsers) { savedUsers = []; threadsToSavedUsers.set(threadID, savedUsers); } savedUsers.push(userID); } for (const [threadID, savedUsers] of threadsToSavedUsers) { if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsNeeded(savedUsers); } } const relationshipRows = relationshipChangeset.getRows(); const [updateDatas] = await Promise.all([ updateChangedUndirectedRelationships(relationshipRows), saveMemberships({ toSave, updateMembershipsLastMessage }), deleteMemberships(toDelete), rescindPushNotifsForMemberDeletion(toRescindPushNotifs), ]); const serverThreadInfoFetchResult = await fetchServerThreadInfos({ threadIDs: changedThreadIDs, }); const { threadInfos: serverThreadInfos } = serverThreadInfoFetchResult; const time = Date.now(); for (const changedThreadID of changedThreadIDs) { const serverThreadInfo = serverThreadInfos[changedThreadID]; for (const memberInfo of serverThreadInfo.members) { const pairString = `${memberInfo.id}|${serverThreadInfo.id}`; const membershipRow = membershipRowMap.get(pairString); if (membershipRow) { continue; } updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID: memberInfo.id, time, threadID: changedThreadID, }); } } for (const row of membershipRowMap.values()) { const { userID, threadID } = row; if (row.operation === 'delete' || row.role === '-1') { if (row.oldRole !== '-1') { updateDatas.push({ type: updateTypes.DELETE_THREAD, userID, time, threadID, }); } } else if (row.userNeedsFullThreadDetails) { updateDatas.push({ type: updateTypes.JOIN_THREAD, userID, time, threadID, }); } else { updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID, time, threadID, }); } } const threadInfoFetchResult = rawThreadInfosFromServerThreadInfos( viewer, serverThreadInfoFetchResult, ); const { viewerUpdates, userInfos } = await createUpdates(updateDatas, { viewer, calendarQuery, ...threadInfoFetchResult, updatesForCurrentSession, }); return { userInfos, viewerUpdates, }; } const emptyGetChangesetCommitResultConfig = Object.freeze({}); // When the user tries to create a new thread, it's possible for the client to // fail the creation even if a row gets added to the threads table. This may // occur due to a timeout (on either the client or server side), or due to some // error in the server code following the INSERT operation. Handling the error // scenario is more challenging since it would require detecting which set of // operations failed so we could retry them. As a result, this code is geared at // only handling the timeout scenario. async function getChangesetCommitResultForExistingThread( viewer: Viewer, threadID: string, otherUpdates: $ReadOnlyArray, { calendarQuery, updatesForCurrentSession = 'return', }: { +calendarQuery?: ?CalendarQuery, +updatesForCurrentSession?: UpdatesForCurrentSession, } = emptyGetChangesetCommitResultConfig, ): Promise { for (const update of otherUpdates) { if ( update.type === updateTypes.JOIN_THREAD && update.threadInfo.id === threadID ) { // If the JOIN_THREAD is already there we can expect // the appropriate UPDATE_USERs to be covered as well return { viewerUpdates: otherUpdates, userInfos: {} }; } } const time = Date.now(); const updateDatas: Array = [ { type: updateTypes.JOIN_THREAD, userID: viewer.userID, time, threadID, targetSession: viewer.session, }, ]; // To figure out what UserInfos might be missing, we consider the worst case: // the same client previously attempted to create a thread with a non-friend // they found via search results, but the request timed out. In this scenario // the viewer might never have received the UPDATE_USER that would add that // UserInfo to their UserStore, but the server assumed the client had gotten // it because createUpdates was called with UpdatesForCurrentSession=return. // For completeness here we query for the full list of memberships rows in the // thread. We can't use fetchServerThreadInfos because it skips role=-1 rows const membershipsQuery = SQL` SELECT user FROM memberships WHERE thread = ${threadID} AND user != ${viewer.userID} `; const [results] = await dbQuery(membershipsQuery); for (const row of results) { updateDatas.push({ type: updateTypes.UPDATE_USER, userID: viewer.userID, time, updatedUserID: row.user.toString(), targetSession: viewer.session, }); } const { viewerUpdates, userInfos } = await createUpdates(updateDatas, { viewer, calendarQuery, updatesForCurrentSession, }); return { viewerUpdates: [...otherUpdates, ...viewerUpdates], userInfos }; } const rescindPushNotifsBatchSize = 3; async function rescindPushNotifsForMemberDeletion( toRescindPushNotifs: $ReadOnlyArray<{ +userID: string, +threadID: string }>, ): Promise { const queue = [...toRescindPushNotifs]; while (queue.length > 0) { const batch = queue.splice(0, rescindPushNotifsBatchSize); await Promise.all( batch.map(({ userID, threadID }) => rescindPushNotifs( SQL`n.thread = ${threadID} AND n.user = ${userID}`, SQL`IF(m.thread = ${threadID}, NULL, m.thread)`, ), ), ); } } // Deprecated - use updateRolesAndPermissionsForAllThreads instead async function DEPRECATED_recalculateAllThreadPermissions() { const getAllThreads = SQL`SELECT id FROM threads`; const [result] = await dbQuery(getAllThreads); // We handle each thread one-by-one to avoid a situation where a permission // calculation for a child thread, done during a call to // recalculateThreadPermissions for the parent thread, can be incorrectly // overriden by a call to recalculateThreadPermissions for the child thread. // If the changeset resulting from the parent call isn't committed before the // calculation is done for the child, the calculation done for the child can // be incorrect. const viewer = createScriptViewer(bots.commbot.userID); for (const row of result) { const threadID = row.id.toString(); const changeset = await recalculateThreadPermissions(threadID); await commitMembershipChangeset(viewer, changeset); } } async function updateRolesAndPermissionsForAllThreads() { const batchSize = 10; const fetchThreads = SQL`SELECT id, type, depth FROM threads`; const [result] = await dbQuery(fetchThreads); const allThreads = result.map(row => { return { id: row.id.toString(), type: assertThreadType(row.type), depth: row.depth, }; }); const viewer = createScriptViewer(bots.commbot.userID); const maxDepth = Math.max(...allThreads.map(row => row.depth)); for (let depth = 0; depth <= maxDepth; depth++) { const threads = allThreads.filter(row => row.depth === depth); console.log(`recalculating permissions for threads with depth ${depth}`); while (threads.length > 0) { const batch = threads.splice(0, batchSize); const membershipRows: Array = []; const relationshipChangeset = new RelationshipChangeset(); await Promise.all( batch.map(async thread => { console.log(`updating roles for ${thread.id}`); await updateRoles(viewer, thread.id, thread.type); console.log(`recalculating permissions for ${thread.id}`); const { membershipRows: threadMembershipRows, relationshipChangeset: threadRelationshipChangeset, } = await recalculateThreadPermissions(thread.id); membershipRows.push(...threadMembershipRows); relationshipChangeset.addAll(threadRelationshipChangeset); }), ); console.log(`committing batch ${JSON.stringify(batch)}`); await commitMembershipChangeset(viewer, { membershipRows, relationshipChangeset, }); } } } export { changeRole, recalculateThreadPermissions, getChangesetCommitResultForExistingThread, saveMemberships, commitMembershipChangeset, DEPRECATED_recalculateAllThreadPermissions, updateRolesAndPermissionsForAllThreads, }; diff --git a/keyserver/src/updaters/thread-updaters.js b/keyserver/src/updaters/thread-updaters.js index 2527fbbf4..5b3700cce 100644 --- a/keyserver/src/updaters/thread-updaters.js +++ b/keyserver/src/updaters/thread-updaters.js @@ -1,975 +1,983 @@ // @flow import { specialRoles } from 'lib/permissions/special-roles.js'; import { getRolePermissionBlobs } from 'lib/permissions/thread-permissions.js'; import { filteredThreadIDs } from 'lib/selectors/calendar-filter-selectors.js'; import { getPinnedContentFromMessage } from 'lib/shared/message-utils.js'; import { threadHasAdminRole, roleIsAdminRole, viewerIsMember, getThreadTypeParentRequirement, } from 'lib/shared/thread-utils.js'; import type { Shape } from 'lib/types/core.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import type { RawMessageInfo, MessageData } from 'lib/types/message-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { threadTypes } from 'lib/types/thread-types-enum.js'; import { type RoleChangeRequest, type ChangeThreadSettingsResult, type RemoveMembersRequest, type LeaveThreadRequest, type LeaveThreadResult, type UpdateThreadRequest, type ServerThreadJoinRequest, type ThreadJoinResult, type ToggleMessagePinRequest, type ToggleMessagePinResult, } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { ServerError } from 'lib/utils/errors.js'; import { promiseAll } from 'lib/utils/promises.js'; import { firstLine } from 'lib/utils/string-utils.js'; import { canToggleMessagePin } from 'lib/utils/toggle-pin-utils.js'; import { validChatNameRegex } from 'lib/utils/validation-utils.js'; import { reportLinkUsage } from './link-updaters.js'; import { updateRoles } from './role-updaters.js'; import { changeRole, recalculateThreadPermissions, commitMembershipChangeset, + type MembershipChangeset, type MembershipRow, } from './thread-permission-updaters.js'; import createMessages from '../creators/message-creator.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { checkIfInviteLinkIsValid } from '../fetchers/link-fetchers.js'; import { fetchMessageInfoByID } from '../fetchers/message-fetchers.js'; import { fetchThreadInfos, fetchServerThreadInfos, determineThreadAncestry, rawThreadInfosFromServerThreadInfos, } from '../fetchers/thread-fetchers.js'; import { checkThreadPermission, viewerIsMember as fetchViewerIsMember, checkThread, validateCandidateMembers, } from '../fetchers/thread-permission-fetchers.js'; import { verifyUserIDs, verifyUserOrCookieIDs, } from '../fetchers/user-fetchers.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import type { Viewer } from '../session/viewer.js'; import RelationshipChangeset from '../utils/relationship-changeset.js'; type UpdateRoleOptions = { +silenceNewMessages?: boolean, +forcePermissionRecalculation?: boolean, }; async function updateRole( viewer: Viewer, request: RoleChangeRequest, options?: UpdateRoleOptions, ): Promise { const silenceNewMessages = options?.silenceNewMessages; const forcePermissionRecalculation = options?.forcePermissionRecalculation; if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [memberIDs, hasPermission, fetchThreadResult] = await Promise.all([ verifyUserIDs(request.memberIDs), checkThreadPermission( viewer, request.threadID, threadPermissions.CHANGE_ROLE, ), fetchThreadInfos(viewer, { threadID: request.threadID }), ]); if (memberIDs.length === 0) { throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_credentials'); } const threadInfo = fetchThreadResult.threadInfos[request.threadID]; if (!threadInfo) { throw new ServerError('invalid_parameters'); } const adminRoleID = Object.keys(threadInfo.roles).find( roleID => threadInfo.roles[roleID].name === 'Admins', ); // Ensure that there will always still be at least one admin in a community if (adminRoleID) { const memberRoles = memberIDs.map( memberID => threadInfo.members.find(member => member.id === memberID)?.role, ); const communityAdminsCount = threadInfo.members.filter( member => member.role === adminRoleID, ).length; const changedAdminsCount = memberRoles.filter( memberRole => memberRole === adminRoleID, ).length; if (changedAdminsCount >= communityAdminsCount) { throw new ServerError('invalid_parameters'); } } const query = SQL` SELECT user, role FROM memberships WHERE user IN (${memberIDs}) AND thread = ${request.threadID} `; const [result] = await dbQuery(query); let nonMemberUser = false; let numResults = 0; for (const row of result) { if (row.role <= 0) { nonMemberUser = true; break; } numResults++; } if (nonMemberUser || numResults < memberIDs.length) { throw new ServerError('invalid_parameters'); } const changeset = await changeRole( request.threadID, memberIDs, request.role, { forcePermissionRecalculation: !!forcePermissionRecalculation, }, ); const { viewerUpdates } = await commitMembershipChangeset( viewer, changeset, forcePermissionRecalculation ? { changedThreadIDs: new Set([request.threadID]) } : undefined, ); let newMessageInfos: Array = []; if (!silenceNewMessages) { const messageData = { type: messageTypes.CHANGE_ROLE, threadID: request.threadID, creatorID: viewer.userID, time: Date.now(), userIDs: memberIDs, newRole: request.role, roleName: threadInfo.roles[request.role].name, }; newMessageInfos = await createMessages(viewer, [messageData]); } return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } async function removeMembers( viewer: Viewer, request: RemoveMembersRequest, ): Promise { const viewerID = viewer.userID; if (request.memberIDs.includes(viewerID)) { throw new ServerError('invalid_parameters'); } const [memberIDs, hasPermission] = await Promise.all([ verifyUserOrCookieIDs(request.memberIDs), checkThreadPermission( viewer, request.threadID, threadPermissions.REMOVE_MEMBERS, ), ]); if (memberIDs.length === 0) { throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_credentials'); } const query = SQL` SELECT m.user, m.role, r.id AS default_role FROM memberships m LEFT JOIN roles r ON r.special_role = ${specialRoles.DEFAULT_ROLE} AND r.thread = ${request.threadID} WHERE m.user IN (${memberIDs}) AND m.thread = ${request.threadID} `; const [result] = await dbQuery(query); let nonDefaultRoleUser = false; const actualMemberIDs = []; for (const row of result) { if (row.role <= 0) { continue; } actualMemberIDs.push(row.user.toString()); if (row.role !== row.default_role) { nonDefaultRoleUser = true; } } if (nonDefaultRoleUser) { const hasChangeRolePermission = await checkThreadPermission( viewer, request.threadID, threadPermissions.CHANGE_ROLE, ); if (!hasChangeRolePermission) { throw new ServerError('invalid_credentials'); } } const changeset = await changeRole(request.threadID, actualMemberIDs, 0); const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset); const newMessageInfos = await (async () => { if (actualMemberIDs.length === 0) { return []; } const messageData = { type: messageTypes.REMOVE_MEMBERS, threadID: request.threadID, creatorID: viewerID, time: Date.now(), removedUserIDs: actualMemberIDs, }; return await createMessages(viewer, [messageData]); })(); return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } async function leaveThread( viewer: Viewer, request: LeaveThreadRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [fetchThreadResult, hasPermission] = await Promise.all([ fetchThreadInfos(viewer, { threadID: request.threadID }), checkThreadPermission( viewer, request.threadID, threadPermissions.LEAVE_THREAD, ), ]); const threadInfo = fetchThreadResult.threadInfos[request.threadID]; if (!viewerIsMember(threadInfo)) { return { updatesResult: { newUpdates: [] }, }; } if (!hasPermission) { throw new ServerError('invalid_parameters'); } const viewerID = viewer.userID; if (threadHasAdminRole(threadInfo)) { let otherUsersExist = false; let otherAdminsExist = false; for (const member of threadInfo.members) { const role = member.role; if (!role || member.id === viewerID) { continue; } otherUsersExist = true; if (roleIsAdminRole(threadInfo.roles[role])) { otherAdminsExist = true; break; } } if (otherUsersExist && !otherAdminsExist) { throw new ServerError('invalid_parameters'); } } const changeset = await changeRole(request.threadID, [viewerID], 0); const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset); const messageData = { type: messageTypes.LEAVE_THREAD, threadID: request.threadID, creatorID: viewerID, time: Date.now(), }; await createMessages(viewer, [messageData]); return { updatesResult: { newUpdates: viewerUpdates } }; } type UpdateThreadOptions = Shape<{ +forceAddMembers: boolean, +forceUpdateRoot: boolean, +silenceMessages: boolean, +ignorePermissions: boolean, }>; async function updateThread( viewer: Viewer, request: UpdateThreadRequest, options?: UpdateThreadOptions, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const forceAddMembers = options?.forceAddMembers ?? false; const forceUpdateRoot = options?.forceUpdateRoot ?? false; const silenceMessages = options?.silenceMessages ?? false; const ignorePermissions = (options?.ignorePermissions && viewer.isScriptViewer) ?? false; - const validationPromises = {}; const changedFields: { [string]: string | number } = {}; const sqlUpdate: { [string]: ?string | number } = {}; const untrimmedName = request.changes.name; if (untrimmedName !== undefined && untrimmedName !== null) { const name = firstLine(untrimmedName); if (name.search(validChatNameRegex) === -1) { throw new ServerError('invalid_chat_name'); } changedFields.name = name; - sqlUpdate.name = name ?? null; + sqlUpdate.name = name; } const { description } = request.changes; if (description !== undefined && description !== null) { changedFields.description = description; - sqlUpdate.description = description ?? null; + sqlUpdate.description; } if (request.changes.color) { const color = request.changes.color.toLowerCase(); changedFields.color = color; sqlUpdate.color = color; } const { parentThreadID } = request.changes; if (parentThreadID !== undefined) { // TODO some sort of message when this changes sqlUpdate.parent_thread_id = parentThreadID; } const { avatar } = request.changes; if (avatar) { changedFields.avatar = avatar.type !== 'remove' ? JSON.stringify(avatar) : ''; sqlUpdate.avatar = avatar.type !== 'remove' ? JSON.stringify(avatar) : null; } const threadType = request.changes.type; if (threadType !== null && threadType !== undefined) { changedFields.type = threadType; sqlUpdate.type = threadType; } if ( !ignorePermissions && threadType !== null && threadType !== undefined && threadType !== threadTypes.COMMUNITY_OPEN_SUBTHREAD && threadType !== threadTypes.COMMUNITY_SECRET_SUBTHREAD ) { throw new ServerError('invalid_parameters'); } const newMemberIDs = request.changes.newMemberIDs && request.changes.newMemberIDs.length > 0 ? [...new Set(request.changes.newMemberIDs)] : null; if ( Object.keys(sqlUpdate).length === 0 && !newMemberIDs && !forceUpdateRoot ) { throw new ServerError('invalid_parameters'); } - validationPromises.serverThreadInfos = fetchServerThreadInfos({ + const serverThreadInfosPromise = fetchServerThreadInfos({ threadID: request.threadID, }); - validationPromises.hasNecessaryPermissions = (async () => { + const hasNecessaryPermissionsPromise = (async () => { if (ignorePermissions) { return; } const checks = []; if (sqlUpdate.name !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_NAME, }); } if (sqlUpdate.description !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_DESCRIPTION, }); } if (sqlUpdate.color !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_COLOR, }); } if (sqlUpdate.avatar !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_AVATAR, }); } if (parentThreadID !== undefined || sqlUpdate.type !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_PERMISSIONS, }); } if (newMemberIDs) { checks.push({ check: 'permission', permission: threadPermissions.ADD_MEMBERS, }); } const hasNecessaryPermissions = await checkThread( viewer, request.threadID, checks, ); if (!hasNecessaryPermissions) { throw new ServerError('invalid_credentials'); } })(); - const { serverThreadInfos } = await promiseAll(validationPromises); + const [serverThreadInfos] = await Promise.all([ + serverThreadInfosPromise, + hasNecessaryPermissionsPromise, + ]); const serverThreadInfo = serverThreadInfos.threadInfos[request.threadID]; if (!serverThreadInfo) { throw new ServerError('internal_error'); } // Threads with source message should be visible to everyone, but we can't // guarantee it for COMMUNITY_SECRET_SUBTHREAD threads so we forbid it for // now. In the future, if we want to support this, we would need to unlink the // source message. if ( threadType !== null && threadType !== undefined && threadType !== threadTypes.SIDEBAR && threadType !== threadTypes.COMMUNITY_OPEN_SUBTHREAD && serverThreadInfo.sourceMessageID ) { throw new ServerError('invalid_parameters'); } // You can't change the parent thread of a current or former SIDEBAR if (parentThreadID !== undefined && serverThreadInfo.sourceMessageID) { throw new ServerError('invalid_parameters'); } const oldThreadType = serverThreadInfo.type; const oldParentThreadID = serverThreadInfo.parentThreadID; const oldContainingThreadID = serverThreadInfo.containingThreadID; const oldCommunity = serverThreadInfo.community; const oldDepth = serverThreadInfo.depth; const nextThreadType = threadType !== null && threadType !== undefined ? threadType : oldThreadType; let nextParentThreadID = parentThreadID !== undefined ? parentThreadID : oldParentThreadID; // Does the new thread type preclude a parent? if ( threadType !== undefined && threadType !== null && getThreadTypeParentRequirement(threadType) === 'disabled' && nextParentThreadID !== null ) { nextParentThreadID = null; sqlUpdate.parent_thread_id = null; } // Does the new thread type require a parent? if ( threadType !== undefined && threadType !== null && getThreadTypeParentRequirement(threadType) === 'required' && nextParentThreadID === null ) { throw new ServerError('no_parent_thread_specified'); } const determineThreadAncestryPromise = determineThreadAncestry( nextParentThreadID, nextThreadType, ); const confirmParentPermissionPromise = (async () => { if (ignorePermissions || !nextParentThreadID) { return; } if ( nextParentThreadID === oldParentThreadID && (nextThreadType === threadTypes.SIDEBAR) === (oldThreadType === threadTypes.SIDEBAR) ) { return; } const hasParentPermission = await checkThreadPermission( viewer, nextParentThreadID, nextThreadType === threadTypes.SIDEBAR ? threadPermissions.CREATE_SIDEBARS : threadPermissions.CREATE_SUBCHANNELS, ); if (!hasParentPermission) { throw new ServerError('invalid_parameters'); } })(); const rolesNeedUpdate = forceUpdateRoot || nextThreadType !== oldThreadType; const validateNewMembersPromise = (async () => { if (!newMemberIDs || ignorePermissions) { return; } const defaultRolePermissionsPromise = (async () => { let rolePermissions; if (!rolesNeedUpdate) { const rolePermissionsQuery = SQL` SELECT r.permissions FROM threads t LEFT JOIN roles r ON r.special_role = ${specialRoles.DEFAULT_ROLE} AND r.thread = ${request.threadID} WHERE t.id = ${request.threadID} `; const [result] = await dbQuery(rolePermissionsQuery); if (result.length > 0) { rolePermissions = JSON.parse(result[0].permissions); } } if (!rolePermissions) { rolePermissions = getRolePermissionBlobs(nextThreadType).Members; } return rolePermissions; })(); const [defaultRolePermissions, nextThreadAncestry] = await Promise.all([ defaultRolePermissionsPromise, determineThreadAncestryPromise, ]); const { newMemberIDs: validatedIDs } = await validateCandidateMembers( viewer, { newMemberIDs }, { threadType: nextThreadType, parentThreadID: nextParentThreadID, containingThreadID: nextThreadAncestry.containingThreadID, defaultRolePermissions, }, { requireRelationship: !forceAddMembers }, ); if ( validatedIDs && Number(validatedIDs?.length) < Number(newMemberIDs?.length) ) { throw new ServerError('invalid_credentials'); } })(); const { nextThreadAncestry } = await promiseAll({ nextThreadAncestry: determineThreadAncestryPromise, confirmParentPermissionPromise, validateNewMembersPromise, }); if (nextThreadAncestry.containingThreadID !== oldContainingThreadID) { sqlUpdate.containing_thread_id = nextThreadAncestry.containingThreadID; } if (nextThreadAncestry.community !== oldCommunity) { if (!ignorePermissions) { throw new ServerError('invalid_parameters'); } sqlUpdate.community = nextThreadAncestry.community; } if (nextThreadAncestry.depth !== oldDepth) { sqlUpdate.depth = nextThreadAncestry.depth; } const updateQueryPromise = (async () => { if (Object.keys(sqlUpdate).length === 0) { return; } const { avatar: avatarUpdate, ...nonAvatarUpdates } = sqlUpdate; const updatePromises = []; if (Object.keys(nonAvatarUpdates).length > 0) { const nonAvatarUpdateQuery = SQL` UPDATE threads SET ${nonAvatarUpdates} WHERE id = ${request.threadID} `; updatePromises.push(dbQuery(nonAvatarUpdateQuery)); } if (avatarUpdate !== undefined) { const avatarUploadID = avatar && (avatar.type === 'image' || avatar.type === 'encrypted_image') ? avatar.uploadID : null; const avatarUpdateQuery = SQL` START TRANSACTION; UPDATE uploads SET container = NULL WHERE container = ${request.threadID} AND ( ${avatarUploadID} IS NULL OR EXISTS ( SELECT 1 FROM uploads WHERE id = ${avatarUploadID} AND ${avatarUploadID} IS NOT NULL AND uploader = ${viewer.userID} AND container IS NULL AND thread IS NULL ) ); UPDATE uploads SET container = ${request.threadID} WHERE id = ${avatarUploadID} AND ${avatarUploadID} IS NOT NULL AND uploader = ${viewer.userID} AND container IS NULL AND thread IS NULL; UPDATE threads SET avatar = ${avatarUpdate} WHERE id = ${request.threadID} AND ( ${avatarUploadID} IS NULL OR EXISTS ( SELECT 1 FROM uploads WHERE id = ${avatarUploadID} AND ${avatarUploadID} IS NOT NULL AND uploader = ${viewer.userID} AND container = ${request.threadID} AND thread IS NULL ) ); COMMIT; `; updatePromises.push( dbQuery(avatarUpdateQuery, { multipleStatements: true }), ); } await Promise.all(updatePromises); })(); const updateRolesPromise = (async () => { if (rolesNeedUpdate) { await updateRoles(viewer, request.threadID, nextThreadType); } })(); - const intermediatePromises = {}; - intermediatePromises.updateQuery = updateQueryPromise; - intermediatePromises.updateRoles = updateRolesPromise; - - if (newMemberIDs) { - intermediatePromises.addMembersChangeset = (async () => { + const addMembersChangesetPromise: Promise = + (async () => { + if (!newMemberIDs) { + return undefined; + } await Promise.all([updateQueryPromise, updateRolesPromise]); return await changeRole(request.threadID, newMemberIDs, null, { setNewMembersToUnread: true, }); })(); - } - const threadRootChanged = - rolesNeedUpdate || nextParentThreadID !== oldParentThreadID; - if (threadRootChanged) { - intermediatePromises.recalculatePermissionsChangeset = (async () => { + const recalculatePermissionsChangesetPromise: Promise = + (async () => { + const threadRootChanged = + rolesNeedUpdate || nextParentThreadID !== oldParentThreadID; + if (!threadRootChanged) { + return undefined; + } await Promise.all([updateQueryPromise, updateRolesPromise]); return await recalculateThreadPermissions(request.threadID); })(); - } - const { addMembersChangeset, recalculatePermissionsChangeset } = - await promiseAll(intermediatePromises); + const [addMembersChangeset, recalculatePermissionsChangeset] = + await Promise.all([ + addMembersChangesetPromise, + recalculatePermissionsChangesetPromise, + updateQueryPromise, + updateRolesPromise, + ]); const membershipRows: Array = []; const relationshipChangeset = new RelationshipChangeset(); if (recalculatePermissionsChangeset) { const { membershipRows: recalculateMembershipRows, relationshipChangeset: recalculateRelationshipChangeset, } = recalculatePermissionsChangeset; membershipRows.push(...recalculateMembershipRows); relationshipChangeset.addAll(recalculateRelationshipChangeset); } let addedMemberIDs; if (addMembersChangeset) { const { membershipRows: addMembersMembershipRows, relationshipChangeset: addMembersRelationshipChangeset, } = addMembersChangeset; addedMemberIDs = addMembersMembershipRows .filter( row => row.operation === 'save' && row.threadID === request.threadID && Number(row.role) > 0, ) .map(row => row.userID); membershipRows.push(...addMembersMembershipRows); relationshipChangeset.addAll(addMembersRelationshipChangeset); } const changeset = { membershipRows, relationshipChangeset }; const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset, { // This forces an update for this thread, // regardless of whether any membership rows are changed changedThreadIDs: Object.keys(sqlUpdate).length > 0 ? new Set([request.threadID]) : new Set(), // last_message will be updated automatically if we send a message, // so we only need to handle it here when we silence new messages updateMembershipsLastMessage: silenceMessages, }); let newMessageInfos: Array = []; if (!silenceMessages) { const time = Date.now(); const messageDatas: Array = []; for (const fieldName in changedFields) { const newValue = changedFields[fieldName]; messageDatas.push({ type: messageTypes.CHANGE_SETTINGS, threadID: request.threadID, creatorID: viewer.userID, time, field: fieldName, value: newValue, }); } if (addedMemberIDs && addedMemberIDs.length > 0) { messageDatas.push({ type: messageTypes.ADD_MEMBERS, threadID: request.threadID, creatorID: viewer.userID, time, addedUserIDs: addedMemberIDs, }); } newMessageInfos = await createMessages(viewer, messageDatas); } return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } async function joinThread( viewer: Viewer, request: ServerThreadJoinRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const permissionCheck = request.inviteLinkSecret ? checkIfInviteLinkIsValid(request.inviteLinkSecret, request.threadID) : checkThreadPermission( viewer, request.threadID, threadPermissions.JOIN_THREAD, ); const [isMember, hasPermission] = await Promise.all([ fetchViewerIsMember(viewer, request.threadID), permissionCheck, ]); if (!hasPermission) { throw new ServerError('invalid_parameters'); } const { calendarQuery } = request; if (isMember) { const response: ThreadJoinResult = { rawMessageInfos: [], truncationStatuses: {}, userInfos: {}, updatesResult: { newUpdates: [], }, }; return response; } if (calendarQuery) { const threadFilterIDs = filteredThreadIDs(calendarQuery.filters); if ( !threadFilterIDs || threadFilterIDs.size !== 1 || threadFilterIDs.values().next().value !== request.threadID ) { throw new ServerError('invalid_parameters'); } } const changeset = await changeRole(request.threadID, [viewer.userID], null); const membershipResult = await commitMembershipChangeset(viewer, changeset, { calendarQuery, }); if (request.inviteLinkSecret) { handleAsyncPromise(reportLinkUsage(request.inviteLinkSecret)); } const messageData = { type: messageTypes.JOIN_THREAD, threadID: request.threadID, creatorID: viewer.userID, time: Date.now(), }; const newMessages = await createMessages(viewer, [messageData]); return { rawMessageInfos: newMessages, truncationStatuses: {}, userInfos: membershipResult.userInfos, updatesResult: { newUpdates: membershipResult.viewerUpdates, }, }; } async function toggleMessagePinForThread( viewer: Viewer, request: ToggleMessagePinRequest, ): Promise { const { messageID, action } = request; const targetMessage = await fetchMessageInfoByID(viewer, messageID); if (!targetMessage) { throw new ServerError('invalid_parameters'); } const { threadID } = targetMessage; const fetchServerThreadInfosResult = await fetchServerThreadInfos({ threadID, }); const { threadInfos: rawThreadInfos } = rawThreadInfosFromServerThreadInfos( viewer, fetchServerThreadInfosResult, ); const rawThreadInfo = rawThreadInfos[threadID]; const canTogglePin = canToggleMessagePin(targetMessage, rawThreadInfo); if (!canTogglePin) { throw new ServerError('invalid_parameters'); } const pinnedValue = action === 'pin' ? 1 : 0; const pinTimeValue = action === 'pin' ? Date.now() : null; const pinnedCountValue = action === 'pin' ? 1 : -1; const query = SQL` UPDATE messages AS m, threads AS t SET m.pinned = ${pinnedValue}, m.pin_time = ${pinTimeValue}, t.pinned_count = t.pinned_count + ${pinnedCountValue} WHERE m.id = ${messageID} AND m.thread = ${threadID} AND t.id = ${threadID} AND m.pinned != ${pinnedValue} `; const [result] = await dbQuery(query); if (result.affectedRows === 0) { return { newMessageInfos: [], threadID, }; } const createMessagesAsync = async () => { const messageData = { type: messageTypes.TOGGLE_PIN, threadID, targetMessageID: messageID, action, pinnedContent: getPinnedContentFromMessage(targetMessage), creatorID: viewer.userID, time: Date.now(), }; const newMessageInfos = await createMessages(viewer, [messageData]); return newMessageInfos; }; const createUpdatesAsync = async () => { const { threadInfos: serverThreadInfos } = fetchServerThreadInfosResult; const time = Date.now(); const updates = []; for (const member of serverThreadInfos[threadID].members) { updates.push({ userID: member.id, time, threadID, type: updateTypes.UPDATE_THREAD, }); } await createUpdates(updates); }; const [newMessageInfos] = await Promise.all([ createMessagesAsync(), createUpdatesAsync(), ]); return { newMessageInfos, threadID, }; } export { updateRole, removeMembers, leaveThread, updateThread, joinThread, toggleMessagePinForThread, };