diff --git a/keyserver/src/creators/invite-link-creator.js b/keyserver/src/creators/invite-link-creator.js index 6f98680e0..3470a11c4 100644 --- a/keyserver/src/creators/invite-link-creator.js +++ b/keyserver/src/creators/invite-link-creator.js @@ -1,121 +1,121 @@ // @flow import type { CreateOrUpdatePublicLinkRequest, InviteLink, } from 'lib/types/link-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { ServerError } from 'lib/utils/errors.js'; import createIDs from './id-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchPrimaryInviteLinks } from '../fetchers/link-fetchers.js'; import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; import { checkThreadPermission } from '../fetchers/thread-permission-fetchers.js'; import { Viewer } from '../session/viewer.js'; const secretRegex = /^[a-zA-Z0-9]+$/; async function createOrUpdatePublicLink( viewer: Viewer, request: CreateOrUpdatePublicLinkRequest, ): Promise { if (!secretRegex.test(request.name)) { throw new ServerError('invalid_parameters'); } const permissionPromise = checkThreadPermission( viewer, request.communityID, threadPermissions.MANAGE_INVITE_LINKS, ); const existingPrimaryLinksPromise = fetchPrimaryInviteLinks(viewer); - const fetchThreadInfoPromise = fetchServerThreadInfos( - SQL`t.id = ${request.communityID}`, - ); + const fetchThreadInfoPromise = fetchServerThreadInfos({ + threadID: request.communityID, + }); const [hasPermission, existingPrimaryLinks, { threadInfos }] = await Promise.all([ permissionPromise, existingPrimaryLinksPromise, fetchThreadInfoPromise, ]); if (!hasPermission) { throw new ServerError('invalid_credentials'); } const threadInfo = threadInfos[request.communityID]; if (!threadInfo) { throw new ServerError('invalid_parameters'); } const defaultRoleID = Object.keys(threadInfo.roles).find( roleID => threadInfo.roles[roleID].isDefault, ); if (!defaultRoleID) { throw new ServerError('invalid_parameters'); } const existingPrimaryLink = existingPrimaryLinks.find( link => link.communityID === request.communityID && link.primary, ); if (existingPrimaryLink) { const query = SQL` UPDATE invite_links SET name = ${request.name} WHERE \`primary\` = 1 AND community = ${request.communityID} `; try { await dbQuery(query); } catch { throw new ServerError('invalid_parameters'); } return { name: request.name, primary: true, role: defaultRoleID, communityID: request.communityID, expirationTime: null, limitOfUses: null, numberOfUses: 0, }; } const [id] = await createIDs('invite_links', 1); const row = [id, request.name, true, request.communityID, defaultRoleID]; const createLinkQuery = SQL` INSERT INTO invite_links(id, name, \`primary\`, community, role) SELECT ${row} WHERE NOT EXISTS ( SELECT i.id FROM invite_links i WHERE i.\`primary\` = 1 AND i.community = ${request.communityID} ) `; let result = null; try { result = (await dbQuery(createLinkQuery))[0]; } catch { throw new ServerError('invalid_parameters'); } if (result.affectedRows === 0) { const deleteIDs = SQL` DELETE FROM ids WHERE id = ${id} `; await dbQuery(deleteIDs); throw new ServerError('invalid_parameters'); } return { name: request.name, primary: true, role: defaultRoleID, communityID: request.communityID, expirationTime: null, limitOfUses: null, numberOfUses: 0, }; } export { createOrUpdatePublicLink }; diff --git a/keyserver/src/creators/message-creator.js b/keyserver/src/creators/message-creator.js index 7c255a59f..5e43b13f5 100644 --- a/keyserver/src/creators/message-creator.js +++ b/keyserver/src/creators/message-creator.js @@ -1,696 +1,696 @@ // @flow import invariant from 'invariant'; import _pickBy from 'lodash/fp/pickBy.js'; import { permissionLookup } from 'lib/permissions/thread-permissions.js'; import { rawMessageInfoFromMessageData, shimUnsupportedRawMessageInfos, stripLocalIDs, } from 'lib/shared/message-utils.js'; import { pushTypes } from 'lib/shared/messages/message-spec.js'; import type { PushType } from 'lib/shared/messages/message-spec.js'; import { messageSpecs } from 'lib/shared/messages/message-specs.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import { messageDataLocalID, type MessageData, type RawMessageInfo, } from 'lib/types/message-types.js'; import { redisMessageTypes } from 'lib/types/redis-types.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { promiseAll } from 'lib/utils/promises.js'; import createIDs from './id-creator.js'; import type { UpdatesForCurrentSession } from './update-creator.js'; import { createUpdates } from './update-creator.js'; import { dbQuery, SQL, appendSQLArray, mergeOrConditions, } from '../database/database.js'; import { processMessagesForSearch } from '../database/search-utils.js'; import { fetchMessageInfoForLocalID, fetchMessageInfoByID, } from '../fetchers/message-fetchers.js'; import { fetchOtherSessionsForViewer } from '../fetchers/session-fetchers.js'; import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; import { sendPushNotifs, sendRescindNotifs } from '../push/send.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import type { Viewer } from '../session/viewer.js'; import { earliestFocusedTimeConsideredExpired } from '../shared/focused-times.js'; import { publisher } from '../socket/redis.js'; import { creationString } from '../utils/idempotent.js'; type UserThreadInfo = { +devices: Map< string, { +platform: string, +deviceToken: string, +cookieID: string, +codeVersion: ?string, }, >, +threadIDs: Set, +notFocusedThreadIDs: Set, +userNotMemberOfSubthreads: Set, +subthreadsCanSetToUnread: Set, }; type LatestMessagesPerUser = Map< string, $ReadOnlyMap< string, { +latestMessage: string, +latestReadMessage?: string, }, >, >; type LatestMessages = $ReadOnlyArray<{ +userID: string, +threadID: string, +latestMessage: string, +latestReadMessage: ?string, }>; // Does not do permission checks! (checkThreadPermission) async function createMessages( viewer: Viewer, messageDatas: $ReadOnlyArray, updatesForCurrentSession?: UpdatesForCurrentSession = 'return', ): Promise { if (messageDatas.length === 0) { return []; } const existingMessages = await Promise.all( messageDatas.map(messageData => fetchMessageInfoForLocalID(viewer, messageDataLocalID(messageData)), ), ); const existingMessageInfos: RawMessageInfo[] = []; const newMessageDatas: MessageData[] = []; for (let i = 0; i < messageDatas.length; i++) { const existingMessage = existingMessages[i]; if (existingMessage) { existingMessageInfos.push(existingMessage); } else { newMessageDatas.push(messageDatas[i]); } } if (newMessageDatas.length === 0) { return shimUnsupportedRawMessageInfos( existingMessageInfos, viewer.platformDetails, ); } const ids = await createIDs('messages', newMessageDatas.length); const returnMessageInfos: RawMessageInfo[] = []; const subthreadPermissionsToCheck: Set = new Set(); const messageInsertRows = []; // Indices in threadsToMessageIndices point to newMessageInfos const newMessageInfos: RawMessageInfo[] = []; const threadsToMessageIndices: Map = new Map(); let nextNewMessageIndex = 0; for (let i = 0; i < messageDatas.length; i++) { const existingMessage = existingMessages[i]; if (existingMessage) { returnMessageInfos.push(existingMessage); continue; } const messageData = messageDatas[i]; const threadID = messageData.threadID; const creatorID = messageData.creatorID; let messageIndices = threadsToMessageIndices.get(threadID); if (!messageIndices) { messageIndices = []; threadsToMessageIndices.set(threadID, messageIndices); } const newMessageIndex = nextNewMessageIndex++; messageIndices.push(newMessageIndex); const serverID = ids[newMessageIndex]; if (messageData.type === messageTypes.CREATE_SUB_THREAD) { subthreadPermissionsToCheck.add(messageData.childThreadID); } const content = messageSpecs[messageData.type].messageContentForServerDB?.(messageData); const creation = messageData.localID && viewer.hasSessionInfo ? creationString(viewer, messageData.localID) : null; let targetMessageID = null; if (messageData.targetMessageID) { targetMessageID = messageData.targetMessageID; } else if (messageData.sourceMessage) { targetMessageID = messageData.sourceMessage.id; } messageInsertRows.push([ serverID, threadID, creatorID, messageData.type, content, messageData.time, creation, targetMessageID, ]); const rawMessageInfo = rawMessageInfoFromMessageData(messageData, serverID); newMessageInfos.push(rawMessageInfo); // at newMessageIndex returnMessageInfos.push(rawMessageInfo); // at i } const messageInsertQuery = SQL` INSERT INTO messages(id, thread, user, type, content, time, creation, target_message) VALUES ${messageInsertRows} `; const messageInsertPromise = dbQuery(messageInsertQuery); const postMessageSendPromise = postMessageSend( viewer, threadsToMessageIndices, subthreadPermissionsToCheck, stripLocalIDs(newMessageInfos), newMessageDatas, updatesForCurrentSession, ); if (!viewer.isScriptViewer) { // If we're not being called from a script, then we avoid awaiting // postMessageSendPromise below so that we don't delay the response to the // user on external services. In that case, we use handleAsyncPromise to // make sure any exceptions are caught and logged. handleAsyncPromise(postMessageSendPromise); } await Promise.all([ messageInsertPromise, updateRepliesCount(threadsToMessageIndices, newMessageDatas), viewer.isScriptViewer ? postMessageSendPromise : undefined, ]); if (updatesForCurrentSession !== 'return') { return []; } return shimUnsupportedRawMessageInfos( returnMessageInfos, viewer.platformDetails, ); } async function updateRepliesCount( threadsToMessageIndices: Map, newMessageDatas: MessageData[], ) { const updatedThreads = []; const updateThreads = SQL` UPDATE threads SET replies_count = replies_count + (CASE `; const membershipConditions = []; for (const [threadID, messages] of threadsToMessageIndices.entries()) { const newRepliesIncrease = messages .map(i => newMessageDatas[i].type) .filter(type => messageSpecs[type].includedInRepliesCount).length; if (newRepliesIncrease === 0) { continue; } updateThreads.append(SQL` WHEN id = ${threadID} THEN ${newRepliesIncrease} `); updatedThreads.push(threadID); const senders = messages.map(i => newMessageDatas[i].creatorID); membershipConditions.push( SQL`thread = ${threadID} AND user IN (${senders})`, ); } updateThreads.append(SQL` ELSE 0 END) WHERE id IN (${updatedThreads}) AND source_message IS NOT NULL `); const updateMemberships = SQL` UPDATE memberships SET sender = 1 WHERE sender = 0 AND ( `; updateMemberships.append(mergeOrConditions(membershipConditions)); updateMemberships.append(SQL` ) `); if (updatedThreads.length > 0) { const [{ threadInfos: serverThreadInfos }] = await Promise.all([ - fetchServerThreadInfos(SQL`t.id IN (${updatedThreads})`), + fetchServerThreadInfos({ threadIDs: new Set(updatedThreads) }), dbQuery(updateThreads), dbQuery(updateMemberships), ]); const time = Date.now(); const updates = []; for (const threadID in serverThreadInfos) { for (const member of serverThreadInfos[threadID].members) { updates.push({ userID: member.id, time, threadID, type: updateTypes.UPDATE_THREAD, }); } } await createUpdates(updates); } } // Handles: // (1) Sending push notifs // (2) Setting threads to unread and generating corresponding UpdateInfos // (3) Publishing to Redis so that active sockets pass on new messages // (4) Processing messages for search async function postMessageSend( viewer: Viewer, threadsToMessageIndices: Map, subthreadPermissionsToCheck: Set, messageInfos: RawMessageInfo[], messageDatas: MessageData[], updatesForCurrentSession: UpdatesForCurrentSession, ) { const processForSearch = processMessagesForSearch(messageInfos); let joinIndex = 0; let subthreadSelects = ''; const subthreadJoins = []; for (const subthread of subthreadPermissionsToCheck) { const index = joinIndex++; subthreadSelects += ` , stm${index}.permissions AS subthread${subthread}_permissions, stm${index}.role AS subthread${subthread}_role `; const join = SQL`LEFT JOIN memberships `; join.append(`stm${index} ON stm${index}.`); join.append(SQL`thread = ${subthread} AND `); join.append(`stm${index}.user = m.user`); subthreadJoins.push(join); } const time = earliestFocusedTimeConsideredExpired(); const visibleExtractString = `$.${threadPermissions.VISIBLE}.value`; const query = SQL` SELECT m.user, m.thread, c.platform, c.device_token, c.versions, c.id, f.user AS focused_user `; query.append(subthreadSelects); query.append(SQL` FROM memberships m LEFT JOIN cookies c ON c.user = m.user AND c.device_token IS NOT NULL LEFT JOIN focused f ON f.user = m.user AND f.thread = m.thread AND f.time > ${time} `); appendSQLArray(query, subthreadJoins, SQL` `); query.append(SQL` WHERE (m.role > 0 OR f.user IS NOT NULL) AND JSON_EXTRACT(m.permissions, ${visibleExtractString}) IS TRUE AND m.thread IN (${[...threadsToMessageIndices.keys()]}) `); const perUserInfo = new Map(); const [result] = await dbQuery(query); for (const row of result) { const userID = row.user.toString(); const threadID = row.thread.toString(); const deviceToken = row.device_token; const focusedUser = !!row.focused_user; const { platform } = row; const versions = JSON.parse(row.versions); const cookieID = row.id; let thisUserInfo = perUserInfo.get(userID); if (!thisUserInfo) { thisUserInfo = { devices: new Map(), threadIDs: new Set(), notFocusedThreadIDs: new Set(), userNotMemberOfSubthreads: new Set(), subthreadsCanSetToUnread: new Set(), }; perUserInfo.set(userID, thisUserInfo); // Subthread info will be the same for each subthread, so we only parse // it once for (const subthread of subthreadPermissionsToCheck) { const isSubthreadMember = row[`subthread${subthread}_role`] > 0; const rawSubthreadPermissions = row[`subthread${subthread}_permissions`]; const subthreadPermissions = JSON.parse(rawSubthreadPermissions); const canSeeSubthread = permissionLookup( subthreadPermissions, threadPermissions.KNOW_OF, ); if (!canSeeSubthread) { continue; } thisUserInfo.subthreadsCanSetToUnread.add(subthread); // Only include the notification from the superthread if there is no // notification from the subthread if ( !isSubthreadMember || !permissionLookup(subthreadPermissions, threadPermissions.VISIBLE) ) { thisUserInfo.userNotMemberOfSubthreads.add(subthread); } } } if (deviceToken && cookieID) { thisUserInfo.devices.set(deviceToken, { platform, deviceToken, cookieID: cookieID.toString(), codeVersion: versions ? versions.codeVersion : null, }); } thisUserInfo.threadIDs.add(threadID); if (!focusedUser) { thisUserInfo.notFocusedThreadIDs.add(threadID); } } const messageInfosPerUser = {}; const latestMessagesPerUser: LatestMessagesPerUser = new Map(); const userPushInfoPromises = {}; const userRescindInfoPromises = {}; for (const pair of perUserInfo) { const [userID, preUserPushInfo] = pair; const userMessageInfos = []; for (const threadID of preUserPushInfo.threadIDs) { const messageIndices = threadsToMessageIndices.get(threadID); invariant(messageIndices, `indices should exist for thread ${threadID}`); for (const messageIndex of messageIndices) { const messageInfo = messageInfos[messageIndex]; userMessageInfos.push(messageInfo); } } if (userMessageInfos.length > 0) { messageInfosPerUser[userID] = userMessageInfos; } latestMessagesPerUser.set( userID, determineLatestMessagesPerThread( preUserPushInfo, userID, threadsToMessageIndices, messageInfos, ), ); const { userNotMemberOfSubthreads } = preUserPushInfo; const userDevices = [...preUserPushInfo.devices.values()]; if (userDevices.length === 0) { continue; } const generateNotifUserInfoPromise = async (pushType: PushType) => { const promises = []; for (const threadID of preUserPushInfo.notFocusedThreadIDs) { const messageIndices = threadsToMessageIndices.get(threadID); invariant( messageIndices, `indices should exist for thread ${threadID}`, ); promises.push( ...messageIndices.map(async messageIndex => { const messageInfo = messageInfos[messageIndex]; const { type } = messageInfo; if (messageInfo.creatorID === userID) { // We never send a user notifs about their own activity return undefined; } const { generatesNotifs } = messageSpecs[type]; const messageData = messageDatas[messageIndex]; if (!generatesNotifs) { return undefined; } const doesGenerateNotif = await generatesNotifs( messageInfo, messageData, { notifTargetUserID: userID, userNotMemberOfSubthreads, fetchMessageInfoByID: (messageID: string) => fetchMessageInfoByID(viewer, messageID), }, ); return doesGenerateNotif === pushType ? { messageInfo, messageData } : undefined; }), ); } const messagesToNotify = await Promise.all(promises); const filteredMessagesToNotify = messagesToNotify.filter(Boolean); if (filteredMessagesToNotify.length === 0) { return undefined; } return { devices: userDevices, messageInfos: filteredMessagesToNotify.map( ({ messageInfo }) => messageInfo, ), messageDatas: filteredMessagesToNotify.map( ({ messageData }) => messageData, ), }; }; const userPushInfoPromise = generateNotifUserInfoPromise(pushTypes.NOTIF); const userRescindInfoPromise = generateNotifUserInfoPromise( pushTypes.RESCIND, ); userPushInfoPromises[userID] = userPushInfoPromise; userRescindInfoPromises[userID] = userRescindInfoPromise; } const latestMessages = flattenLatestMessagesPerUser(latestMessagesPerUser); const [pushInfo, rescindInfo] = await Promise.all([ promiseAll(userPushInfoPromises), promiseAll(userRescindInfoPromises), createReadStatusUpdates(latestMessages), redisPublish(viewer, messageInfosPerUser, updatesForCurrentSession), updateLatestMessages(latestMessages), processForSearch, ]); await Promise.all([ sendPushNotifs(_pickBy(Boolean)(pushInfo)), sendRescindNotifs(_pickBy(Boolean)(rescindInfo)), ]); } async function redisPublish( viewer: Viewer, messageInfosPerUser: { [userID: string]: $ReadOnlyArray }, updatesForCurrentSession: UpdatesForCurrentSession, ) { const avoidBroadcastingToCurrentSession = viewer.hasSessionInfo && updatesForCurrentSession !== 'broadcast'; for (const userID in messageInfosPerUser) { if (userID === viewer.userID && avoidBroadcastingToCurrentSession) { continue; } const messageInfos = messageInfosPerUser[userID]; publisher.sendMessage( { userID }, { type: redisMessageTypes.NEW_MESSAGES, messages: messageInfos, }, ); } const viewerMessageInfos = messageInfosPerUser[viewer.userID]; if (!viewerMessageInfos || !avoidBroadcastingToCurrentSession) { return; } const sessionIDs = await fetchOtherSessionsForViewer(viewer); for (const sessionID of sessionIDs) { publisher.sendMessage( { userID: viewer.userID, sessionID }, { type: redisMessageTypes.NEW_MESSAGES, messages: viewerMessageInfos, }, ); } } function determineLatestMessagesPerThread( preUserPushInfo: UserThreadInfo, userID: string, threadsToMessageIndices: $ReadOnlyMap>, messageInfos: $ReadOnlyArray, ) { const { threadIDs, notFocusedThreadIDs, subthreadsCanSetToUnread } = preUserPushInfo; const latestMessagesPerThread = new Map(); for (const threadID of threadIDs) { const messageIndices = threadsToMessageIndices.get(threadID); invariant(messageIndices, `indices should exist for thread ${threadID}`); for (const messageIndex of messageIndices) { const messageInfo = messageInfos[messageIndex]; if ( messageInfo.type === messageTypes.CREATE_SUB_THREAD && !subthreadsCanSetToUnread.has(messageInfo.childThreadID) ) { continue; } const messageID = messageInfo.id; invariant( messageID, 'message ID should exist in determineLatestMessagesPerThread', ); if ( notFocusedThreadIDs.has(threadID) && messageInfo.creatorID !== userID ) { latestMessagesPerThread.set(threadID, { latestMessage: messageID, }); } else { latestMessagesPerThread.set(threadID, { latestMessage: messageID, latestReadMessage: messageID, }); } } } return latestMessagesPerThread; } function flattenLatestMessagesPerUser( latestMessagesPerUser: LatestMessagesPerUser, ): LatestMessages { const result = []; for (const [userID, latestMessagesPerThread] of latestMessagesPerUser) { for (const [threadID, latestMessages] of latestMessagesPerThread) { result.push({ userID, threadID, latestMessage: latestMessages.latestMessage, latestReadMessage: latestMessages.latestReadMessage, }); } } return result; } async function createReadStatusUpdates(latestMessages: LatestMessages) { const now = Date.now(); const readStatusUpdates = latestMessages .filter(message => !message.latestReadMessage) .map(({ userID, threadID }) => ({ type: updateTypes.UPDATE_THREAD_READ_STATUS, userID, time: now, threadID, unread: true, })); if (readStatusUpdates.length === 0) { return; } await createUpdates(readStatusUpdates); } function updateLatestMessages(latestMessages: LatestMessages) { if (latestMessages.length === 0) { return; } const query = SQL` UPDATE memberships SET `; const lastMessageExpression = SQL` last_message = GREATEST(last_message, CASE `; const lastReadMessageExpression = SQL` , last_read_message = GREATEST(last_read_message, CASE `; let shouldUpdateLastReadMessage = false; for (const { userID, threadID, latestMessage, latestReadMessage, } of latestMessages) { lastMessageExpression.append(SQL` WHEN user = ${userID} AND thread = ${threadID} THEN ${latestMessage} `); if (latestReadMessage) { shouldUpdateLastReadMessage = true; lastReadMessageExpression.append(SQL` WHEN user = ${userID} AND thread = ${threadID} THEN ${latestReadMessage} `); } } lastMessageExpression.append(SQL` ELSE last_message END) `); lastReadMessageExpression.append(SQL` ELSE last_read_message END) `); const conditions = latestMessages.map( ({ userID, threadID }) => SQL`(user = ${userID} AND thread = ${threadID})`, ); query.append(lastMessageExpression); if (shouldUpdateLastReadMessage) { query.append(lastReadMessageExpression); } query.append(SQL`WHERE `); query.append(mergeOrConditions(conditions)); dbQuery(query); } export default createMessages; diff --git a/keyserver/src/creators/update-creator.js b/keyserver/src/creators/update-creator.js index e5d4df5e9..4878d3d7d 100644 --- a/keyserver/src/creators/update-creator.js +++ b/keyserver/src/creators/update-creator.js @@ -1,794 +1,793 @@ // @flow import invariant from 'invariant'; import { nonThreadCalendarFilters } from 'lib/selectors/calendar-filter-selectors.js'; import { keyForUpdateData, keyForUpdateInfo, rawUpdateInfoFromUpdateData, } from 'lib/shared/update-utils.js'; import { type RawEntryInfo, type FetchEntryInfosBase, type CalendarQuery, defaultCalendarQuery, } from 'lib/types/entry-types.js'; import { defaultNumberPerThread, type FetchMessageInfosResult, } from 'lib/types/message-types.js'; import { type UpdateTarget, redisMessageTypes, type NewUpdatesRedisMessage, } from 'lib/types/redis-types.js'; import type { RawThreadInfo } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { type ServerUpdateInfo, type UpdateData, type RawUpdateInfo, type CreateUpdatesResult, } from 'lib/types/update-types.js'; import type { UserInfos, LoggedInUserInfo, OldLoggedInUserInfo, } from 'lib/types/user-types.js'; import { promiseAll } from 'lib/utils/promises.js'; import createIDs from './id-creator.js'; import { dbQuery, SQL, mergeAndConditions } from '../database/database.js'; import type { SQLStatementType } from '../database/types.js'; import { deleteUpdatesByConditions } from '../deleters/update-deleters.js'; import { fetchEntryInfos, fetchEntryInfosByID, } from '../fetchers/entry-fetchers.js'; import { fetchMessageInfos } from '../fetchers/message-fetchers.js'; import { fetchThreadInfos, type FetchThreadInfosResult, } from '../fetchers/thread-fetchers.js'; import { fetchKnownUserInfos, fetchCurrentUserInfo, } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { channelNameForUpdateTarget, publisher } from '../socket/redis.js'; export type UpdatesForCurrentSession = // This is the default if no Viewer is passed, or if an isSocket Viewer is // passed in. We will broadcast to all valid sessions via Redis and return // nothing to the caller, relying on the current session's Redis listener to // pick up the updates and deliver them asynchronously. | 'broadcast' // This is the default if a non-isSocket Viewer is passed in. We avoid // broadcasting the update to the current session, and instead return the // update to the caller, who will handle delivering it to the client. | 'return' // This means we ignore any updates destined for the current session. // Presumably the caller knows what they are doing and has a different way of // communicating the relevant information to the client. | 'ignore'; type DeleteCondition = { +userID: string, +target: ?string, +types: 'all_types' | $ReadOnlySet, }; export type ViewerInfo = | { viewer: Viewer, calendarQuery?: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, } | { viewer: Viewer, calendarQuery: ?CalendarQuery, updatesForCurrentSession?: UpdatesForCurrentSession, threadInfos: { +[id: string]: RawThreadInfo }, }; const defaultUpdateCreationResult = { viewerUpdates: [], userInfos: {} }; const sortFunction = ( a: UpdateData | ServerUpdateInfo, b: UpdateData | ServerUpdateInfo, ) => a.time - b.time; const deleteUpdatesBatchSize = 500; // Creates rows in the updates table based on the inputed updateDatas. Returns // UpdateInfos pertaining to the provided viewerInfo, as well as related // UserInfos. If no viewerInfo is provided, no UpdateInfos will be returned. And // the update row won't have an updater column, meaning no session will be // excluded from the update. async function createUpdates( updateDatas: $ReadOnlyArray, passedViewerInfo?: ?ViewerInfo, ): Promise { if (updateDatas.length === 0) { return defaultUpdateCreationResult; } // viewer.session will throw for a script Viewer let viewerInfo = passedViewerInfo; if ( viewerInfo && (viewerInfo.viewer.isScriptViewer || !viewerInfo.viewer.loggedIn) ) { viewerInfo = null; } const sortedUpdateDatas = [...updateDatas].sort(sortFunction); const filteredUpdateDatas: UpdateData[] = []; const keyedUpdateDatas: Map = new Map(); for (const updateData of sortedUpdateDatas) { const key = keyForUpdateData(updateData); if (!key) { filteredUpdateDatas.push(updateData); continue; } const conditionKey = `${updateData.userID}|${key}`; const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} has conditionKey ` + `${conditionKey} but no deleteCondition`, ); const curUpdateDatas = keyedUpdateDatas.get(conditionKey); if (!curUpdateDatas) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const filteredCurrent = curUpdateDatas.filter(curUpdateData => filterOnDeleteCondition(curUpdateData, deleteCondition), ); if (filteredCurrent.length === 0) { keyedUpdateDatas.set(conditionKey, [updateData]); continue; } const isNewUpdateDataFiltered = !filteredCurrent.every(curUpdateData => { const curDeleteCondition = getDeleteCondition(curUpdateData); invariant( curDeleteCondition, `updateData of type ${curUpdateData.type} is in keyedUpdateDatas ` + "but doesn't have a deleteCondition", ); return filterOnDeleteCondition(updateData, curDeleteCondition); }); if (!isNewUpdateDataFiltered) { filteredCurrent.push(updateData); } keyedUpdateDatas.set(conditionKey, filteredCurrent); } for (const keyUpdateDatas of keyedUpdateDatas.values()) { filteredUpdateDatas.push(...keyUpdateDatas); } const ids = await createIDs('updates', filteredUpdateDatas.length); let updatesForCurrentSession = viewerInfo && viewerInfo.updatesForCurrentSession; if (!updatesForCurrentSession && viewerInfo) { updatesForCurrentSession = viewerInfo.viewer.isSocket ? 'broadcast' : 'return'; } else if (!updatesForCurrentSession) { updatesForCurrentSession = 'broadcast'; } const dontBroadcastSession = updatesForCurrentSession !== 'broadcast' && viewerInfo ? viewerInfo.viewer.session : null; const publishInfos: Map = new Map(); const viewerRawUpdateInfos: RawUpdateInfo[] = []; const insertRows: (?(number | string))[][] = []; const earliestTime: Map = new Map(); for (let i = 0; i < filteredUpdateDatas.length; i++) { const updateData = filteredUpdateDatas[i]; let content; if (updateData.type === updateTypes.DELETE_ACCOUNT) { content = JSON.stringify({ deletedUserID: updateData.deletedUserID }); } else if (updateData.type === updateTypes.UPDATE_THREAD) { content = JSON.stringify({ threadID: updateData.threadID }); } else if (updateData.type === updateTypes.UPDATE_THREAD_READ_STATUS) { const { threadID, unread } = updateData; content = JSON.stringify({ threadID, unread }); } else if ( updateData.type === updateTypes.DELETE_THREAD || updateData.type === updateTypes.JOIN_THREAD ) { const { threadID } = updateData; content = JSON.stringify({ threadID }); } else if (updateData.type === updateTypes.BAD_DEVICE_TOKEN) { const { deviceToken } = updateData; content = JSON.stringify({ deviceToken }); } else if (updateData.type === updateTypes.UPDATE_ENTRY) { const { entryID } = updateData; content = JSON.stringify({ entryID }); } else if (updateData.type === updateTypes.UPDATE_CURRENT_USER) { // user column contains all the info we need to construct the UpdateInfo content = null; } else if (updateData.type === updateTypes.UPDATE_USER) { const { updatedUserID } = updateData; content = JSON.stringify({ updatedUserID }); } else { invariant(false, `unrecognized updateType ${updateData.type}`); } const target = getTargetFromUpdateData(updateData); const rawUpdateInfo = rawUpdateInfoFromUpdateData(updateData, ids[i]); if (!target || !dontBroadcastSession || target !== dontBroadcastSession) { const updateTarget = target ? { userID: updateData.userID, sessionID: target } : { userID: updateData.userID }; const channelName = channelNameForUpdateTarget(updateTarget); let publishInfo = publishInfos.get(channelName); if (!publishInfo) { publishInfo = { updateTarget, rawUpdateInfos: [] }; publishInfos.set(channelName, publishInfo); } publishInfo.rawUpdateInfos.push(rawUpdateInfo); } if ( updatesForCurrentSession === 'return' && viewerInfo && updateData.userID === viewerInfo.viewer.id && (!target || target === viewerInfo.viewer.session) ) { viewerRawUpdateInfos.push(rawUpdateInfo); } if (viewerInfo && target && viewerInfo.viewer.session === target) { // In the case where this update is being created only for the current // session, there's no reason to insert a row into the updates table continue; } const key = keyForUpdateData(updateData); if (key) { const conditionKey = `${updateData.userID}|${key}`; const currentEarliestTime = earliestTime.get(conditionKey); if (!currentEarliestTime || updateData.time < currentEarliestTime) { earliestTime.set(conditionKey, updateData.time); } } const insertRow = [ ids[i], updateData.userID, updateData.type, key, content, updateData.time, dontBroadcastSession, target, ]; insertRows.push(insertRow); } type DeleteUpdatesConditions = { key: string, target?: string, types?: number[], time?: number, }; const usersByConditions: Map< string, { conditions: DeleteUpdatesConditions, users: Set, }, > = new Map(); for (const [conditionKey, keyUpdateDatas] of keyedUpdateDatas) { const deleteConditionByTarget: Map = new Map(); for (const updateData of keyUpdateDatas) { const deleteCondition = getDeleteCondition(updateData); invariant( deleteCondition, `updateData of type ${updateData.type} is in keyedUpdateDatas but ` + "doesn't have a deleteCondition", ); const { target, types } = deleteCondition; const existingDeleteCondition = deleteConditionByTarget.get(target); if (!existingDeleteCondition) { deleteConditionByTarget.set(target, deleteCondition); continue; } const existingTypes = existingDeleteCondition.types; if (existingTypes === 'all_types') { continue; } else if (types === 'all_types') { deleteConditionByTarget.set(target, deleteCondition); continue; } const mergedTypes = new Set([...types, ...existingTypes]); deleteConditionByTarget.set(target, { ...deleteCondition, types: mergedTypes, }); } for (const deleteCondition of deleteConditionByTarget.values()) { const { userID, target, types } = deleteCondition; const key = conditionKey.split('|')[1]; const conditions: DeleteUpdatesConditions = { key }; if (target) { conditions.target = target; } if (types !== 'all_types') { invariant(types.size > 0, 'deleteCondition had empty types set'); conditions.types = [...types]; } const earliestTimeForCondition = earliestTime.get(conditionKey); if (earliestTimeForCondition) { conditions.time = earliestTimeForCondition; } const conditionsKey = JSON.stringify(conditions); if (!usersByConditions.has(conditionsKey)) { usersByConditions.set(conditionsKey, { conditions, users: new Set(), }); } usersByConditions.get(conditionsKey)?.users.add(userID); } } const deleteSQLConditions: SQLStatementType[] = []; for (const { conditions, users } of usersByConditions.values()) { const sqlConditions = [ SQL`u.user IN (${[...users]})`, SQL`u.key = ${conditions.key}`, ]; if (conditions.target) { sqlConditions.push(SQL`u.target = ${conditions.target}`); } if (conditions.types) { sqlConditions.push(SQL`u.type IN (${conditions.types})`); } if (conditions.time) { sqlConditions.push(SQL`u.time < ${conditions.time}`); } deleteSQLConditions.push(mergeAndConditions(sqlConditions)); } const promises = {}; if (insertRows.length > 0) { const insertQuery = SQL` INSERT INTO updates(id, user, type, \`key\`, content, time, updater, target) `; insertQuery.append(SQL`VALUES ${insertRows}`); promises.insert = dbQuery(insertQuery); } if (publishInfos.size > 0) { promises.redis = redisPublish(publishInfos.values(), dontBroadcastSession); } if (deleteSQLConditions.length > 0) { promises.delete = (async () => { while (deleteSQLConditions.length > 0) { const batch = deleteSQLConditions.splice(0, deleteUpdatesBatchSize); await deleteUpdatesByConditions(batch); } })(); } if (viewerRawUpdateInfos.length > 0) { invariant(viewerInfo, 'should be set'); promises.updatesResult = fetchUpdateInfosWithRawUpdateInfos( viewerRawUpdateInfos, viewerInfo, ); } const { updatesResult } = await promiseAll(promises); if (!updatesResult) { return defaultUpdateCreationResult; } const { updateInfos, userInfos } = updatesResult; return { viewerUpdates: updateInfos, userInfos }; } export type FetchUpdatesResult = { +updateInfos: $ReadOnlyArray, +userInfos: UserInfos, }; async function fetchUpdateInfosWithRawUpdateInfos( rawUpdateInfos: $ReadOnlyArray, viewerInfo: ViewerInfo, ): Promise { const { viewer } = viewerInfo; const threadIDsNeedingFetch = new Set(); const entryIDsNeedingFetch = new Set(); let currentUserNeedsFetch = false; const threadIDsNeedingDetailedFetch = new Set(); // entries and messages for (const rawUpdateInfo of rawUpdateInfos) { if ( !viewerInfo.threadInfos && (rawUpdateInfo.type === updateTypes.UPDATE_THREAD || rawUpdateInfo.type === updateTypes.JOIN_THREAD) ) { threadIDsNeedingFetch.add(rawUpdateInfo.threadID); } if (rawUpdateInfo.type === updateTypes.JOIN_THREAD) { threadIDsNeedingDetailedFetch.add(rawUpdateInfo.threadID); } else if (rawUpdateInfo.type === updateTypes.UPDATE_ENTRY) { entryIDsNeedingFetch.add(rawUpdateInfo.entryID); } else if (rawUpdateInfo.type === updateTypes.UPDATE_CURRENT_USER) { currentUserNeedsFetch = true; } } const promises = {}; if (!viewerInfo.threadInfos && threadIDsNeedingFetch.size > 0) { - promises.threadResult = fetchThreadInfos( - viewer, - SQL`t.id IN (${[...threadIDsNeedingFetch]})`, - ); + promises.threadResult = fetchThreadInfos(viewer, { + threadIDs: threadIDsNeedingFetch, + }); } let calendarQuery: ?CalendarQuery = viewerInfo.calendarQuery ? viewerInfo.calendarQuery : null; if (!calendarQuery && viewer.hasSessionInfo) { // This should only ever happen for "legacy" clients who call in without // providing this information. These clients wouldn't know how to deal with // the corresponding UpdateInfos anyways, so no reason to be worried. calendarQuery = viewer.calendarQuery; } else if (!calendarQuery) { calendarQuery = defaultCalendarQuery(viewer.platform, viewer.timeZone); } if (threadIDsNeedingDetailedFetch.size > 0) { const messageSelectionCriteria = { threadCursors: {} }; for (const threadID of threadIDsNeedingDetailedFetch) { messageSelectionCriteria.threadCursors[threadID] = false; } promises.messageInfosResult = fetchMessageInfos( viewer, messageSelectionCriteria, defaultNumberPerThread, ); const threadCalendarQuery = { ...calendarQuery, filters: [ ...nonThreadCalendarFilters(calendarQuery.filters), { type: 'threads', threadIDs: [...threadIDsNeedingDetailedFetch] }, ], }; promises.calendarResult = fetchEntryInfos(viewer, [threadCalendarQuery]); } if (entryIDsNeedingFetch.size > 0) { promises.entryInfosResult = fetchEntryInfosByID(viewer, [ ...entryIDsNeedingFetch, ]); } if (currentUserNeedsFetch) { promises.currentUserInfoResult = (async () => { const currentUserInfo = await fetchCurrentUserInfo(viewer); invariant(currentUserInfo.anonymous === undefined, 'should be logged in'); return currentUserInfo; })(); } const { threadResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, } = await promiseAll(promises); let threadInfosResult; if (viewerInfo.threadInfos) { const { threadInfos } = viewerInfo; threadInfosResult = { threadInfos }; } else if (threadResult) { threadInfosResult = threadResult; } else { threadInfosResult = { threadInfos: {} }; } return await updateInfosFromRawUpdateInfos(viewer, rawUpdateInfos, { threadInfosResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, }); } export type UpdateInfosRawData = { threadInfosResult: FetchThreadInfosResult, messageInfosResult: ?FetchMessageInfosResult, calendarResult: ?FetchEntryInfosBase, entryInfosResult: ?$ReadOnlyArray, currentUserInfoResult: ?OldLoggedInUserInfo | LoggedInUserInfo, }; async function updateInfosFromRawUpdateInfos( viewer: Viewer, rawUpdateInfos: $ReadOnlyArray, rawData: UpdateInfosRawData, ): Promise { const { threadInfosResult, messageInfosResult, calendarResult, entryInfosResult, currentUserInfoResult, } = rawData; const updateInfos = []; const userIDsToFetch = new Set(); for (const rawUpdateInfo of rawUpdateInfos) { if (rawUpdateInfo.type === updateTypes.DELETE_ACCOUNT) { updateInfos.push({ type: updateTypes.DELETE_ACCOUNT, id: rawUpdateInfo.id, time: rawUpdateInfo.time, deletedUserID: rawUpdateInfo.deletedUserID, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_THREAD) { const threadInfo = threadInfosResult.threadInfos[rawUpdateInfo.threadID]; if (!threadInfo) { console.warn( "failed to hydrate updateTypes.UPDATE_THREAD because we couldn't " + `fetch RawThreadInfo for ${rawUpdateInfo.threadID}`, ); continue; } updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadInfo, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS) { updateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadID: rawUpdateInfo.threadID, unread: rawUpdateInfo.unread, }); } else if (rawUpdateInfo.type === updateTypes.DELETE_THREAD) { updateInfos.push({ type: updateTypes.DELETE_THREAD, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadID: rawUpdateInfo.threadID, }); } else if (rawUpdateInfo.type === updateTypes.JOIN_THREAD) { const threadInfo = threadInfosResult.threadInfos[rawUpdateInfo.threadID]; if (!threadInfo) { console.warn( "failed to hydrate updateTypes.JOIN_THREAD because we couldn't " + `fetch RawThreadInfo for ${rawUpdateInfo.threadID}`, ); continue; } const rawEntryInfos = []; invariant(calendarResult, 'should be set'); for (const entryInfo of calendarResult.rawEntryInfos) { if (entryInfo.threadID === rawUpdateInfo.threadID) { rawEntryInfos.push(entryInfo); } } const rawMessageInfos = []; invariant(messageInfosResult, 'should be set'); for (const messageInfo of messageInfosResult.rawMessageInfos) { if (messageInfo.threadID === rawUpdateInfo.threadID) { rawMessageInfos.push(messageInfo); } } updateInfos.push({ type: updateTypes.JOIN_THREAD, id: rawUpdateInfo.id, time: rawUpdateInfo.time, threadInfo, rawMessageInfos, truncationStatus: messageInfosResult.truncationStatuses[rawUpdateInfo.threadID], rawEntryInfos, }); } else if (rawUpdateInfo.type === updateTypes.BAD_DEVICE_TOKEN) { updateInfos.push({ type: updateTypes.BAD_DEVICE_TOKEN, id: rawUpdateInfo.id, time: rawUpdateInfo.time, deviceToken: rawUpdateInfo.deviceToken, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_ENTRY) { invariant(entryInfosResult, 'should be set'); const entryInfo = entryInfosResult.find( candidate => candidate.id === rawUpdateInfo.entryID, ); if (!entryInfo) { console.warn( "failed to hydrate updateTypes.UPDATE_ENTRY because we couldn't " + `fetch RawEntryInfo for ${rawUpdateInfo.entryID}`, ); continue; } updateInfos.push({ type: updateTypes.UPDATE_ENTRY, id: rawUpdateInfo.id, time: rawUpdateInfo.time, entryInfo, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_CURRENT_USER) { invariant(currentUserInfoResult, 'should be set'); updateInfos.push({ type: updateTypes.UPDATE_CURRENT_USER, id: rawUpdateInfo.id, time: rawUpdateInfo.time, currentUserInfo: currentUserInfoResult, }); } else if (rawUpdateInfo.type === updateTypes.UPDATE_USER) { updateInfos.push({ type: updateTypes.UPDATE_USER, id: rawUpdateInfo.id, time: rawUpdateInfo.time, updatedUserID: rawUpdateInfo.updatedUserID, }); userIDsToFetch.add(rawUpdateInfo.updatedUserID); } else { invariant(false, `unrecognized updateType ${rawUpdateInfo.type}`); } } let userInfos = {}; if (userIDsToFetch.size > 0) { userInfos = await fetchKnownUserInfos(viewer, [...userIDsToFetch]); } updateInfos.sort(sortFunction); // Now we'll attempt to merge UpdateInfos so that we only have one per key const updateForKey: Map = new Map(); const mergedUpdates: ServerUpdateInfo[] = []; for (const updateInfo of updateInfos) { const key = keyForUpdateInfo(updateInfo); if (!key) { mergedUpdates.push(updateInfo); continue; } else if ( updateInfo.type === updateTypes.DELETE_THREAD || updateInfo.type === updateTypes.JOIN_THREAD || updateInfo.type === updateTypes.DELETE_ACCOUNT ) { updateForKey.set(key, updateInfo); continue; } const currentUpdateInfo = updateForKey.get(key); if (!currentUpdateInfo) { updateForKey.set(key, updateInfo); } else if ( updateInfo.type === updateTypes.UPDATE_THREAD && currentUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS ) { // UPDATE_THREAD trumps UPDATE_THREAD_READ_STATUS // Note that we keep the oldest UPDATE_THREAD updateForKey.set(key, updateInfo); } else if ( updateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS && currentUpdateInfo.type === updateTypes.UPDATE_THREAD_READ_STATUS ) { // If we only have UPDATE_THREAD_READ_STATUS, keep the most recent updateForKey.set(key, updateInfo); } else if (updateInfo.type === updateTypes.UPDATE_ENTRY) { updateForKey.set(key, updateInfo); } else if (updateInfo.type === updateTypes.UPDATE_CURRENT_USER) { updateForKey.set(key, updateInfo); } } for (const [, updateInfo] of updateForKey) { mergedUpdates.push(updateInfo); } mergedUpdates.sort(sortFunction); return { updateInfos: mergedUpdates, userInfos }; } type PublishInfo = { updateTarget: UpdateTarget, rawUpdateInfos: RawUpdateInfo[], }; async function redisPublish( publishInfos: Iterator, dontBroadcastSession: ?string, ): Promise { for (const publishInfo of publishInfos) { const { updateTarget, rawUpdateInfos } = publishInfo; const redisMessage: NewUpdatesRedisMessage = { type: redisMessageTypes.NEW_UPDATES, updates: rawUpdateInfos, }; if (!updateTarget.sessionID && dontBroadcastSession) { redisMessage.ignoreSession = dontBroadcastSession; } publisher.sendMessage(updateTarget, redisMessage); } } function getTargetFromUpdateData(updateData: UpdateData): ?string { if (updateData.targetSession) { return updateData.targetSession; } else if (updateData.targetCookie) { return updateData.targetCookie; } else { return null; } } function getDeleteCondition(updateData: UpdateData): ?DeleteCondition { let types; if (updateData.type === updateTypes.DELETE_ACCOUNT) { types = new Set([updateTypes.DELETE_ACCOUNT, updateTypes.UPDATE_USER]); } else if (updateData.type === updateTypes.UPDATE_THREAD) { types = new Set([ updateTypes.UPDATE_THREAD, updateTypes.UPDATE_THREAD_READ_STATUS, ]); } else if (updateData.type === updateTypes.UPDATE_THREAD_READ_STATUS) { types = new Set([updateTypes.UPDATE_THREAD_READ_STATUS]); } else if ( updateData.type === updateTypes.DELETE_THREAD || updateData.type === updateTypes.JOIN_THREAD ) { types = 'all_types'; } else if (updateData.type === updateTypes.UPDATE_ENTRY) { types = 'all_types'; } else if (updateData.type === updateTypes.UPDATE_CURRENT_USER) { types = new Set([updateTypes.UPDATE_CURRENT_USER]); } else if (updateData.type === updateTypes.UPDATE_USER) { types = new Set([updateTypes.UPDATE_USER]); } else { return null; } const target = getTargetFromUpdateData(updateData); const { userID } = updateData; return { userID, target, types }; } function filterOnDeleteCondition( updateData: UpdateData, deleteCondition: DeleteCondition, ): boolean { invariant( updateData.userID === deleteCondition.userID, `updateData of type ${updateData.type} being compared to wrong userID`, ); if (deleteCondition.target) { const target = getTargetFromUpdateData(updateData); if (target !== deleteCondition.target) { return true; } } if (deleteCondition.types === 'all_types') { return false; } return !deleteCondition.types.has(updateData.type); } export { createUpdates, fetchUpdateInfosWithRawUpdateInfos }; diff --git a/keyserver/src/deleters/thread-deleters.js b/keyserver/src/deleters/thread-deleters.js index b4c479e4f..6d5966b5e 100644 --- a/keyserver/src/deleters/thread-deleters.js +++ b/keyserver/src/deleters/thread-deleters.js @@ -1,140 +1,140 @@ // @flow import { permissionLookup } from 'lib/permissions/thread-permissions.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { type ThreadDeletionRequest, type LeaveThreadResult, } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { ServerError } from 'lib/utils/errors.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchServerThreadInfos, fetchContainedThreadIDs, } from '../fetchers/thread-fetchers.js'; import { fetchThreadPermissionsBlob } from '../fetchers/thread-permission-fetchers.js'; import { fetchUpdateInfoForThreadDeletion } from '../fetchers/update-fetchers.js'; import { rescindPushNotifs } from '../push/rescind.js'; import type { Viewer } from '../session/viewer.js'; async function deleteThread( viewer: Viewer, threadDeletionRequest: ThreadDeletionRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const { threadID } = threadDeletionRequest; const permissionsBlob = await fetchThreadPermissionsBlob(viewer, threadID); if (!permissionsBlob) { // This should only occur if the first request goes through but the client // never receives the response const { updateInfos } = await fetchUpdateInfoForThreadDeletion( viewer, threadID, ); return { updatesResult: { newUpdates: updateInfos } }; } const hasPermission = permissionLookup( permissionsBlob, threadPermissions.DELETE_THREAD, ); if (!hasPermission) { throw new ServerError('invalid_credentials'); } // TODO: handle descendant thread permission update correctly. // thread-permission-updaters should be used for descendant threads. const threadIDs = await fetchContainedThreadIDs(threadID); const [{ threadInfos: serverThreadInfos }] = await Promise.all([ - fetchServerThreadInfos(SQL`t.id IN (${threadIDs})`), + fetchServerThreadInfos({ threadIDs: new Set(threadID) }), rescindPushNotifs( SQL`n.thread IN (${threadIDs})`, SQL`IF(m.thread IN (${threadIDs}), NULL, m.thread)`, ), ]); const query = SQL` DELETE t, ic, d, id, e, ie, re, ire, mm, r, ir, ms, im, up, iu, f, n, ino FROM threads t LEFT JOIN ids ic ON ic.id = t.id LEFT JOIN days d ON d.thread = t.id LEFT JOIN ids id ON id.id = d.id LEFT JOIN entries e ON e.day = d.id LEFT JOIN ids ie ON ie.id = e.id LEFT JOIN revisions re ON re.entry = e.id LEFT JOIN ids ire ON ire.id = re.id LEFT JOIN memberships mm ON mm.thread = t.id LEFT JOIN roles r ON r.thread = t.id LEFT JOIN ids ir ON ir.id = r.id LEFT JOIN messages ms ON ms.thread = t.id LEFT JOIN ids im ON im.id = ms.id LEFT JOIN uploads up ON (up.container = ms.id OR up.container = t.id) LEFT JOIN ids iu ON iu.id = up.id LEFT JOIN focused f ON f.thread = t.id LEFT JOIN notifications n ON n.thread = t.id LEFT JOIN ids ino ON ino.id = n.id WHERE t.id IN (${threadIDs}) `; const time = Date.now(); const updateDatas = []; for (const containedThreadID of threadIDs) { for (const memberInfo of serverThreadInfos[containedThreadID].members) { updateDatas.push({ type: updateTypes.DELETE_THREAD, userID: memberInfo.id, time, threadID: containedThreadID, }); } } const [{ viewerUpdates }] = await Promise.all([ createUpdates(updateDatas, { viewer, updatesForCurrentSession: 'return' }), dbQuery(query), ]); return { updatesResult: { newUpdates: viewerUpdates } }; } async function deleteInaccessibleThreads(): Promise { // A thread is considered "inaccessible" if it has no membership rows. Note // that membership rows exist whenever a user can see a thread, even if they // are not technically a member (in which case role=0). For now, we're also // excluding threads with children, since to properly delete those we would // need to update their parent_thread_id, and possibly change their type. await dbQuery(SQL` DELETE t, i, m2, d, id, e, ie, re, ire, r, ir, ms, im, up, iu, f, n, ino FROM threads t LEFT JOIN ids i ON i.id = t.id LEFT JOIN memberships m1 ON m1.thread = t.id AND m1.role > -1 LEFT JOIN threads c ON c.parent_thread_id = t.id LEFT JOIN memberships m2 ON m2.thread = t.id LEFT JOIN days d ON d.thread = t.id LEFT JOIN ids id ON id.id = d.id LEFT JOIN entries e ON e.day = d.id LEFT JOIN ids ie ON ie.id = e.id LEFT JOIN revisions re ON re.entry = e.id LEFT JOIN ids ire ON ire.id = re.id LEFT JOIN roles r ON r.thread = t.id LEFT JOIN ids ir ON ir.id = r.id LEFT JOIN messages ms ON ms.thread = t.id LEFT JOIN ids im ON im.id = ms.id LEFT JOIN uploads up ON (up.container = ms.id OR up.container = t.id) LEFT JOIN ids iu ON iu.id = up.id LEFT JOIN focused f ON f.thread = t.id LEFT JOIN notifications n ON n.thread = t.id LEFT JOIN ids ino ON ino.id = n.id WHERE m1.thread IS NULL AND c.id IS NULL `); } export { deleteThread, deleteInaccessibleThreads }; diff --git a/keyserver/src/fetchers/thread-fetchers.js b/keyserver/src/fetchers/thread-fetchers.js index 6c01cc8da..dadfc87fe 100644 --- a/keyserver/src/fetchers/thread-fetchers.js +++ b/keyserver/src/fetchers/thread-fetchers.js @@ -1,313 +1,348 @@ // @flow import invariant from 'invariant'; import { getAllThreadPermissions } from 'lib/permissions/thread-permissions.js'; import { rawThreadInfoFromServerThreadInfo, getContainingThreadID, getCommunity, } from 'lib/shared/thread-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { AvatarDBContent, ClientAvatar } from 'lib/types/avatar-types.js'; import type { RawMessageInfo, MessageInfo } from 'lib/types/message-types.js'; import { threadTypes, type ThreadType } from 'lib/types/thread-types-enum.js'; import { type RawThreadInfo, type ServerThreadInfo, } from 'lib/types/thread-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { getUploadURL } from './upload-fetchers.js'; -import { dbQuery, SQL } from '../database/database.js'; +import { dbQuery, SQL, mergeAndConditions } from '../database/database.js'; import type { SQLStatementType } from '../database/types.js'; import type { Viewer } from '../session/viewer.js'; +type FetchThreadInfosFilter = $Shape<{ + +threadID: string, + +threadIDs: $ReadOnlySet, + +parentThreadID: string, + +sourceMessageID: string, +}>; +function constructWhereClause( + filter: FetchThreadInfosFilter, +): SQLStatementType { + const conditions = []; + + if (filter.threadID) { + conditions.push(SQL`t.id = ${filter.threadID}`); + } + + if (filter.threadIDs) { + conditions.push(SQL`t.id IN (${[...filter.threadIDs]})`); + } + + if (filter.parentThreadID) { + conditions.push(SQL`t.parent_thread_id = ${filter.parentThreadID}`); + } + + if (filter.sourceMessageID) { + conditions.push(SQL`t.source_message = ${filter.sourceMessageID}`); + } + + if (conditions.length === 0) { + return SQL``; + } + + const clause = mergeAndConditions(conditions); + return SQL`WHERE `.append(clause); +} + type FetchServerThreadInfosResult = { +threadInfos: { +[id: string]: ServerThreadInfo }, }; async function fetchServerThreadInfos( - condition?: SQLStatementType, + filter?: FetchThreadInfosFilter, ): Promise { - const whereClause = condition ? SQL`WHERE `.append(condition) : ''; + const whereClause = filter ? constructWhereClause(filter) : ''; const rolesQuery = SQL` SELECT t.id, t.default_role, r.id AS role, r.name, r.permissions FROM threads t LEFT JOIN roles r ON r.thread = t.id `.append(whereClause); const threadsQuery = SQL` SELECT t.id, t.name, t.parent_thread_id, t.containing_thread_id, t.community, t.depth, t.color, t.description, t.type, t.creation_time, t.source_message, t.replies_count, t.avatar, t.pinned_count, m.user, m.role, m.permissions, m.subscription, m.last_read_message < m.last_message AS unread, m.sender, up.id AS upload_id, up.secret AS upload_secret FROM threads t LEFT JOIN memberships m ON m.thread = t.id AND m.role >= 0 LEFT JOIN uploads up ON up.container = t.id ` .append(whereClause) .append(SQL` ORDER BY m.user ASC`); const [[threadsResult], [rolesResult]] = await Promise.all([ dbQuery(threadsQuery), dbQuery(rolesQuery), ]); const threadInfos = {}; for (const threadsRow of threadsResult) { const threadID = threadsRow.id.toString(); if (!threadInfos[threadID]) { threadInfos[threadID] = { id: threadID, type: threadsRow.type, name: threadsRow.name ? threadsRow.name : '', description: threadsRow.description ? threadsRow.description : '', color: threadsRow.color, creationTime: threadsRow.creation_time, parentThreadID: threadsRow.parent_thread_id ? threadsRow.parent_thread_id.toString() : null, containingThreadID: threadsRow.containing_thread_id ? threadsRow.containing_thread_id.toString() : null, depth: threadsRow.depth, community: threadsRow.community ? threadsRow.community.toString() : null, members: [], roles: {}, repliesCount: threadsRow.replies_count, pinnedCount: threadsRow.pinned_count, }; if (threadsRow.avatar) { const avatar: AvatarDBContent = JSON.parse(threadsRow.avatar); let clientAvatar: ?ClientAvatar; if (avatar && avatar.type !== 'image') { clientAvatar = avatar; } else if ( avatar && avatar.type === 'image' && threadsRow.upload_id && threadsRow.upload_secret ) { const uploadID = threadsRow.upload_id.toString(); invariant( uploadID === avatar.uploadID, `uploadID of upload should match uploadID of image avatar`, ); clientAvatar = { type: 'image', uri: getUploadURL(uploadID, threadsRow.upload_secret), }; } threadInfos[threadID] = { ...threadInfos[threadID], avatar: clientAvatar, }; } } const sourceMessageID = threadsRow.source_message?.toString(); if (sourceMessageID) { threadInfos[threadID].sourceMessageID = sourceMessageID; } if (threadsRow.user) { const userID = threadsRow.user.toString(); const allPermissions = getAllThreadPermissions( JSON.parse(threadsRow.permissions), threadID, ); threadInfos[threadID].members.push({ id: userID, permissions: allPermissions, role: threadsRow.role ? threadsRow.role.toString() : null, subscription: JSON.parse(threadsRow.subscription), unread: threadsRow.role ? !!threadsRow.unread : null, isSender: !!threadsRow.sender, }); } } for (const rolesRow of rolesResult) { const threadID = rolesRow.id.toString(); if (!rolesRow.role) { continue; } const role = rolesRow.role.toString(); if (!threadInfos[threadID].roles[role]) { threadInfos[threadID].roles[role] = { id: role, name: rolesRow.name, permissions: JSON.parse(rolesRow.permissions), isDefault: role === rolesRow.default_role.toString(), }; } } return { threadInfos }; } export type FetchThreadInfosResult = { +threadInfos: { +[id: string]: RawThreadInfo }, }; async function fetchThreadInfos( viewer: Viewer, - condition?: SQLStatementType, + filter?: FetchThreadInfosFilter, ): Promise { - const serverResult = await fetchServerThreadInfos(condition); + const serverResult = await fetchServerThreadInfos(filter); return rawThreadInfosFromServerThreadInfos(viewer, serverResult); } function rawThreadInfosFromServerThreadInfos( viewer: Viewer, serverResult: FetchServerThreadInfosResult, ): FetchThreadInfosResult { const viewerID = viewer.id; const codeVersionBelow209 = !hasMinCodeVersion(viewer.platformDetails, { native: 209, }); const codeVersionBelow213 = !hasMinCodeVersion(viewer.platformDetails, { native: 213, }); const codeVersionBelow221 = !hasMinCodeVersion(viewer.platformDetails, { native: 221, }); const threadInfos = {}; for (const threadID in serverResult.threadInfos) { const serverThreadInfo = serverResult.threadInfos[threadID]; const threadInfo = rawThreadInfoFromServerThreadInfo( serverThreadInfo, viewerID, { filterThreadEditAvatarPermission: codeVersionBelow213, excludePinInfo: codeVersionBelow209, filterManageInviteLinksPermission: codeVersionBelow221, }, ); if (threadInfo) { threadInfos[threadID] = threadInfo; } } return { threadInfos }; } async function verifyThreadIDs( threadIDs: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { if (threadIDs.length === 0) { return []; } const query = SQL`SELECT id FROM threads WHERE id IN (${threadIDs})`; const [result] = await dbQuery(query); const verified = []; for (const row of result) { verified.push(row.id.toString()); } return verified; } async function verifyThreadID(threadID: string): Promise { const result = await verifyThreadIDs([threadID]); return result.length !== 0; } type ThreadAncestry = { +containingThreadID: ?string, +community: ?string, +depth: number, }; async function determineThreadAncestry( parentThreadID: ?string, threadType: ThreadType, ): Promise { if (!parentThreadID) { return { containingThreadID: null, community: null, depth: 0 }; } - const parentThreadInfos = await fetchServerThreadInfos( - SQL`t.id = ${parentThreadID}`, - ); + const parentThreadInfos = await fetchServerThreadInfos({ + threadID: parentThreadID, + }); const parentThreadInfo = parentThreadInfos.threadInfos[parentThreadID]; if (!parentThreadInfo) { throw new ServerError('invalid_parameters'); } const containingThreadID = getContainingThreadID( parentThreadInfo, threadType, ); const community = getCommunity(parentThreadInfo); const depth = parentThreadInfo.depth + 1; return { containingThreadID, community, depth }; } function personalThreadQuery( firstMemberID: string, secondMemberID: string, ): SQLStatementType { return SQL` SELECT t.id FROM threads t INNER JOIN memberships m1 ON m1.thread = t.id AND m1.user = ${firstMemberID} INNER JOIN memberships m2 ON m2.thread = t.id AND m2.user = ${secondMemberID} WHERE t.type = ${threadTypes.PERSONAL} AND m1.role > 0 AND m2.role > 0 `; } async function fetchPersonalThreadID( viewerID: string, otherMemberID: string, ): Promise { const query = personalThreadQuery(viewerID, otherMemberID); const [threads] = await dbQuery(query); return threads[0]?.id.toString(); } async function serverThreadInfoFromMessageInfo( message: RawMessageInfo | MessageInfo, ): Promise { const threadID = message.threadID; - const threads = await fetchServerThreadInfos(SQL`t.id = ${threadID}`); + const threads = await fetchServerThreadInfos({ threadID }); return threads.threadInfos[threadID]; } async function fetchContainedThreadIDs( parentThreadID: string, ): Promise> { const query = SQL` WITH RECURSIVE thread_tree AS ( SELECT id, containing_thread_id FROM threads WHERE id = ${parentThreadID} UNION ALL SELECT t.id, t.containing_thread_id FROM threads t JOIN thread_tree tt ON t.containing_thread_id = tt.id ) SELECT id FROM thread_tree `; const [result] = await dbQuery(query); return result.map(row => row.id.toString()); } export { fetchServerThreadInfos, fetchThreadInfos, rawThreadInfosFromServerThreadInfos, verifyThreadIDs, verifyThreadID, determineThreadAncestry, personalThreadQuery, fetchPersonalThreadID, serverThreadInfoFromMessageInfo, fetchContainedThreadIDs, }; diff --git a/keyserver/src/fetchers/thread-permission-fetchers.js b/keyserver/src/fetchers/thread-permission-fetchers.js index 3839c96c5..4c297d632 100644 --- a/keyserver/src/fetchers/thread-permission-fetchers.js +++ b/keyserver/src/fetchers/thread-permission-fetchers.js @@ -1,366 +1,366 @@ // @flow import genesis from 'lib/facts/genesis.js'; import { permissionLookup, makePermissionsBlob, getRoleForPermissions, } from 'lib/permissions/thread-permissions.js'; import { relationshipBlockedInEitherDirection } from 'lib/shared/relationship-utils.js'; import { threadFrozenDueToBlock, permissionsDisabledByBlock, } from 'lib/shared/thread-utils.js'; import { userRelationshipStatus } from 'lib/types/relationship-types.js'; import type { ThreadPermission, ThreadPermissionsBlob, ThreadRolePermissionsBlob, } from 'lib/types/thread-permission-types.js'; import type { ThreadType } from 'lib/types/thread-types-enum.js'; import { fetchThreadInfos } from './thread-fetchers.js'; import { fetchKnownUserInfos } from './user-fetchers.js'; import { dbQuery, SQL } from '../database/database.js'; import type { Viewer } from '../session/viewer.js'; // Note that it's risky to verify permissions by inspecting the blob directly. // There are other factors that can override permissions in the permissions // blob, such as when one user blocks another. It's always better to go through // checkThreads and friends, or by looking at the ThreadInfo through // threadHasPermission. async function fetchThreadPermissionsBlob( viewer: Viewer, threadID: string, ): Promise { const viewerID = viewer.id; const query = SQL` SELECT permissions FROM memberships WHERE thread = ${threadID} AND user = ${viewerID} `; const [result] = await dbQuery(query); if (result.length === 0) { return null; } const row = result[0]; return JSON.parse(row.permissions); } function checkThreadPermission( viewer: Viewer, threadID: string, permission: ThreadPermission, ): Promise { return checkThread(viewer, threadID, [{ check: 'permission', permission }]); } function viewerIsMember(viewer: Viewer, threadID: string): Promise { return checkThread(viewer, threadID, [{ check: 'is_member' }]); } type Check = | { +check: 'is_member' } | { +check: 'permission', +permission: ThreadPermission }; function isThreadValid( permissions: ?ThreadPermissionsBlob, role: number, checks: $ReadOnlyArray, ): boolean { for (const check of checks) { if (check.check === 'is_member') { if (role <= 0) { return false; } } else if (check.check === 'permission') { if (!permissionLookup(permissions, check.permission)) { return false; } } } return true; } async function checkThreads( viewer: Viewer, threadIDs: $ReadOnlyArray, checks: $ReadOnlyArray, ): Promise> { if (viewer.isScriptViewer) { // script viewers are all-powerful return new Set(threadIDs); } const threadRows = await getValidThreads(viewer, threadIDs, checks); return new Set(threadRows.map(row => row.threadID)); } type PartialMembershipRow = { +threadID: string, +role: number, +permissions: ThreadPermissionsBlob, }; async function getValidThreads( viewer: Viewer, threadIDs: $ReadOnlyArray, checks: $ReadOnlyArray, ): Promise { const query = SQL` SELECT thread AS threadID, permissions, role FROM memberships WHERE thread IN (${threadIDs}) AND user = ${viewer.userID} `; const permissionsToCheck = []; for (const check of checks) { if (check.check === 'permission') { permissionsToCheck.push(check.permission); } } const [[result], disabledThreadIDs] = await Promise.all([ dbQuery(query), checkThreadsFrozen(viewer, permissionsToCheck, threadIDs), ]); return result .map(row => ({ ...row, threadID: row.threadID.toString(), permissions: JSON.parse(row.permissions), })) .filter( row => isThreadValid(row.permissions, row.role, checks) && !disabledThreadIDs.has(row.threadID), ); } async function checkThreadsFrozen( viewer: Viewer, permissionsToCheck: $ReadOnlyArray, threadIDs: $ReadOnlyArray, ) { const threadIDsWithDisabledPermissions = new Set(); const permissionMightBeDisabled = permissionsToCheck.some(permission => permissionsDisabledByBlock.has(permission), ); if (!permissionMightBeDisabled) { return threadIDsWithDisabledPermissions; } const [{ threadInfos }, userInfos] = await Promise.all([ - fetchThreadInfos(viewer, SQL`t.id IN (${[...threadIDs]})`), + fetchThreadInfos(viewer, { threadIDs: new Set(threadIDs) }), fetchKnownUserInfos(viewer), ]); for (const threadID in threadInfos) { const blockedThread = threadFrozenDueToBlock( threadInfos[threadID], viewer.id, userInfos, ); if (blockedThread) { threadIDsWithDisabledPermissions.add(threadID); } } return threadIDsWithDisabledPermissions; } async function checkIfThreadIsBlocked( viewer: Viewer, threadID: string, permission: ThreadPermission, ): Promise { const disabledThreadIDs = await checkThreadsFrozen( viewer, [permission], [threadID], ); return disabledThreadIDs.has(threadID); } async function checkThread( viewer: Viewer, threadID: string, checks: $ReadOnlyArray, ): Promise { const validThreads = await checkThreads(viewer, [threadID], checks); return validThreads.has(threadID); } // We pass this into getRoleForPermissions in order to check if a hypothetical // permissions blob would block membership by returning a non-positive result. // It doesn't matter what value we pass in, as long as it's positive. const arbitraryPositiveRole = '1'; type CandidateMembers = { +[key: string]: ?$ReadOnlyArray, ... }; type ValidateCandidateMembersParams = { +threadType: ThreadType, +parentThreadID: ?string, +containingThreadID: ?string, +defaultRolePermissions: ThreadRolePermissionsBlob, }; type ValidateCandidateMembersOptions = { +requireRelationship?: boolean }; async function validateCandidateMembers( viewer: Viewer, candidates: CandidateMembers, params: ValidateCandidateMembersParams, options?: ValidateCandidateMembersOptions, ): Promise { const requireRelationship = options?.requireRelationship ?? true; const allCandidatesSet = new Set(); for (const key in candidates) { const candidateGroup = candidates[key]; if (!candidateGroup) { continue; } for (const candidate of candidateGroup) { allCandidatesSet.add(candidate); } } const allCandidates = [...allCandidatesSet]; const fetchMembersPromise = fetchKnownUserInfos(viewer, allCandidates); const parentPermissionsPromise = (async () => { const parentPermissions = {}; if (!params.parentThreadID || allCandidates.length === 0) { return parentPermissions; } const parentPermissionsQuery = SQL` SELECT user, permissions FROM memberships WHERE thread = ${params.parentThreadID} AND user IN (${allCandidates}) `; const [result] = await dbQuery(parentPermissionsQuery); for (const row of result) { parentPermissions[row.user.toString()] = JSON.parse(row.permissions); } return parentPermissions; })(); const memberOfContainingThreadPromise: Promise< Map, > = (async () => { const results = new Map(); if (allCandidates.length === 0) { return results; } if (!params.containingThreadID) { for (const userID of allCandidates) { results.set(userID, 'no-containing-thread'); } return results; } const memberOfContainingThreadQuery = SQL` SELECT user, role AS containing_role FROM memberships WHERE thread = ${params.containingThreadID} AND user IN (${allCandidates}) `; const [result] = await dbQuery(memberOfContainingThreadQuery); for (const row of result) { results.set( row.user.toString(), row.containing_role > 0 ? 'member' : 'non-member', ); } return results; })(); const [fetchedMembers, parentPermissions, memberOfContainingThread] = await Promise.all([ fetchMembersPromise, parentPermissionsPromise, memberOfContainingThreadPromise, ]); const ignoreMembers = new Set(); for (const memberID of allCandidates) { const member = fetchedMembers[memberID]; if (!member && requireRelationship) { ignoreMembers.add(memberID); continue; } const relationshipStatus = member?.relationshipStatus; const memberRelationshipHasBlock = !!( relationshipStatus && relationshipBlockedInEitherDirection(relationshipStatus) ); if (memberRelationshipHasBlock) { ignoreMembers.add(memberID); continue; } const permissionsFromParent = parentPermissions[memberID]; if (memberOfContainingThread.get(memberID) === 'non-member') { ignoreMembers.add(memberID); continue; } const isParentThreadGenesis = params.parentThreadID === genesis.id; if ( (memberOfContainingThread.get(memberID) === 'no-containing-thread' || isParentThreadGenesis) && relationshipStatus !== userRelationshipStatus.FRIEND && requireRelationship ) { ignoreMembers.add(memberID); continue; } const permissions = makePermissionsBlob( params.defaultRolePermissions, permissionsFromParent, '-1', params.threadType, ); if (!permissions) { ignoreMembers.add(memberID); continue; } const targetRole = getRoleForPermissions( arbitraryPositiveRole, permissions, ); if (Number(targetRole) <= 0) { ignoreMembers.add(memberID); continue; } } if (ignoreMembers.size === 0) { return candidates; } const result = {}; for (const key in candidates) { const candidateGroup = candidates[key]; if (!candidateGroup) { result[key] = candidates[key]; continue; } result[key] = []; for (const candidate of candidateGroup) { if (!ignoreMembers.has(candidate)) { result[key].push(candidate); } } } return result; } export { fetchThreadPermissionsBlob, checkThreadPermission, viewerIsMember, checkThreads, getValidThreads, checkThread, checkIfThreadIsBlocked, validateCandidateMembers, }; diff --git a/keyserver/src/push/send.js b/keyserver/src/push/send.js index 838b7e782..5ec5f35b1 100644 --- a/keyserver/src/push/send.js +++ b/keyserver/src/push/send.js @@ -1,1331 +1,1329 @@ // @flow import apn from '@parse/node-apn'; import type { ResponseFailure } from '@parse/node-apn'; import invariant from 'invariant'; import _cloneDeep from 'lodash/fp/cloneDeep.js'; import _flow from 'lodash/fp/flow.js'; import _mapValues from 'lodash/fp/mapValues.js'; import _pickBy from 'lodash/fp/pickBy.js'; import t from 'tcomb'; import uuidv4 from 'uuid/v4.js'; import { oldValidUsernameRegex } from 'lib/shared/account-utils.js'; import { isMentioned } from 'lib/shared/mention-utils.js'; import { createMessageInfo, sortMessageInfoList, shimUnsupportedRawMessageInfos, } from 'lib/shared/message-utils.js'; import { messageSpecs } from 'lib/shared/messages/message-specs.js'; import { notifTextsForMessageInfo } from 'lib/shared/notif-utils.js'; import { rawThreadInfoFromServerThreadInfo, threadInfoFromRawThreadInfo, } from 'lib/shared/thread-utils.js'; import type { Platform, PlatformDetails } from 'lib/types/device-types.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import { type RawMessageInfo, type MessageData, } from 'lib/types/message-types.js'; import { rawMessageInfoValidator } from 'lib/types/message-types.js'; import type { WebNotification, WNSNotification, ResolvedNotifTexts, } from 'lib/types/notif-types.js'; import { resolvedNotifTextsValidator } from 'lib/types/notif-types.js'; import type { ServerThreadInfo } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { promiseAll } from 'lib/utils/promises.js'; import { tID, tPlatformDetails, tShape } from 'lib/utils/validation-utils.js'; import { prepareEncryptedIOSNotifications, prepareEncryptedAndroidNotifications, } from './crypto.js'; import { getAPNsNotificationTopic } from './providers.js'; import { rescindPushNotifs } from './rescind.js'; import type { NotificationTargetDevice, TargetedAPNsNotification, TargetedAndroidNotification, } from './types.js'; import { apnPush, fcmPush, getUnreadCounts, apnMaxNotificationPayloadByteSize, fcmMaxNotificationPayloadByteSize, wnsMaxNotificationPayloadByteSize, webPush, wnsPush, type WebPushError, type WNSPushError, } from './utils.js'; import createIDs from '../creators/id-creator.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL, mergeOrConditions } from '../database/database.js'; import type { CollapsableNotifInfo } from '../fetchers/message-fetchers.js'; import { fetchCollapsableNotifs } from '../fetchers/message-fetchers.js'; import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchUserInfos } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { getENSNames } from '../utils/ens-cache.js'; import { validateOutput } from '../utils/validation-utils.js'; type Device = { +platform: Platform, +deviceToken: string, +cookieID: string, +codeVersion: ?number, }; type PushUserInfo = { +devices: Device[], // messageInfos and messageDatas have the same key +messageInfos: RawMessageInfo[], +messageDatas: MessageData[], }; type Delivery = PushDelivery | { collapsedInto: string }; type NotificationRow = { +dbID: string, +userID: string, +threadID?: ?string, +messageID?: ?string, +collapseKey?: ?string, +deliveries: Delivery[], }; export type PushInfo = { [userID: string]: PushUserInfo }; async function sendPushNotifs(pushInfo: PushInfo) { if (Object.keys(pushInfo).length === 0) { return; } const [ unreadCounts, { usersToCollapsableNotifInfo, serverThreadInfos, userInfos }, dbIDs, ] = await Promise.all([ getUnreadCounts(Object.keys(pushInfo)), fetchInfos(pushInfo), createDBIDs(pushInfo), ]); const deliveryPromises = []; const notifications: Map = new Map(); for (const userID in usersToCollapsableNotifInfo) { const threadInfos = _flow( _mapValues((serverThreadInfo: ServerThreadInfo) => { const rawThreadInfo = rawThreadInfoFromServerThreadInfo( serverThreadInfo, userID, ); if (!rawThreadInfo) { return null; } return threadInfoFromRawThreadInfo(rawThreadInfo, userID, userInfos); }), _pickBy(threadInfo => threadInfo), )(serverThreadInfos); for (const notifInfo of usersToCollapsableNotifInfo[userID]) { const hydrateMessageInfo = (rawMessageInfo: RawMessageInfo) => createMessageInfo(rawMessageInfo, userID, userInfos, threadInfos); const newMessageInfos = []; const newRawMessageInfos = []; for (const newRawMessageInfo of notifInfo.newMessageInfos) { const newMessageInfo = hydrateMessageInfo(newRawMessageInfo); if (newMessageInfo) { newMessageInfos.push(newMessageInfo); newRawMessageInfos.push(newRawMessageInfo); } } if (newMessageInfos.length === 0) { continue; } const existingMessageInfos = notifInfo.existingMessageInfos .map(hydrateMessageInfo) .filter(Boolean); const allMessageInfos = sortMessageInfoList([ ...newMessageInfos, ...existingMessageInfos, ]); const [firstNewMessageInfo, ...remainingNewMessageInfos] = newMessageInfos; const { threadID } = firstNewMessageInfo; const threadInfo = threadInfos[threadID]; const updateBadge = threadInfo.currentUser.subscription.home; const displayBanner = threadInfo.currentUser.subscription.pushNotifs; const username = userInfos[userID] && userInfos[userID].username; const userWasMentioned = username && threadInfo.currentUser.role && oldValidUsernameRegex.test(username) && newMessageInfos.some(newMessageInfo => { const unwrappedMessageInfo = newMessageInfo.type === messageTypes.SIDEBAR_SOURCE ? newMessageInfo.sourceMessage : newMessageInfo; return ( unwrappedMessageInfo.type === messageTypes.TEXT && isMentioned(username, unwrappedMessageInfo.text) ); }); if (!updateBadge && !displayBanner && !userWasMentioned) { continue; } const badgeOnly = !displayBanner && !userWasMentioned; const notifTargetUserInfo = { id: userID, username }; const notifTexts = await notifTextsForMessageInfo( allMessageInfos, threadInfo, notifTargetUserInfo, getENSNames, ); if (!notifTexts) { continue; } const dbID = dbIDs.shift(); invariant(dbID, 'should have sufficient DB IDs'); const byPlatform = getDevicesByPlatform(pushInfo[userID].devices); const firstMessageID = firstNewMessageInfo.id; invariant(firstMessageID, 'RawMessageInfo.id should be set on server'); const notificationInfo = { source: 'new_message', dbID, userID, threadID, messageID: firstMessageID, collapseKey: notifInfo.collapseKey, }; const iosVersionsToTokens = byPlatform.get('ios'); if (iosVersionsToTokens) { for (const [codeVersion, devices] of iosVersionsToTokens) { const platformDetails = { platform: 'ios', codeVersion }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const deliveryPromise = (async () => { const targetedNotifications = await prepareAPNsNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount: unreadCounts[userID], platformDetails, }, devices, ); return await sendAPNsNotification('ios', targetedNotifications, { ...notificationInfo, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } const androidVersionsToTokens = byPlatform.get('android'); if (androidVersionsToTokens) { for (const [codeVersion, devices] of androidVersionsToTokens) { const platformDetails = { platform: 'android', codeVersion }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const deliveryPromise = (async () => { const targetedNotifications = await prepareAndroidNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount: unreadCounts[userID], platformDetails, dbID, }, devices, ); return await sendAndroidNotification(targetedNotifications, { ...notificationInfo, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } const webVersionsToTokens = byPlatform.get('web'); if (webVersionsToTokens) { for (const [codeVersion, devices] of webVersionsToTokens) { const platformDetails = { platform: 'web', codeVersion }; const deliveryPromise = (async () => { const notification = await prepareWebNotification({ notifTexts, threadID: threadInfo.id, unreadCount: unreadCounts[userID], platformDetails, }); const deviceTokens = devices.map(({ deviceToken }) => deviceToken); return await sendWebNotification(notification, deviceTokens, { ...notificationInfo, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } const macosVersionsToTokens = byPlatform.get('macos'); if (macosVersionsToTokens) { for (const [codeVersion, devices] of macosVersionsToTokens) { const platformDetails = { platform: 'macos', codeVersion }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const deliveryPromise = (async () => { const targetedNotifications = await prepareAPNsNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount: unreadCounts[userID], platformDetails, }, devices, ); return await sendAPNsNotification('macos', targetedNotifications, { ...notificationInfo, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } const windowsVersionsToTokens = byPlatform.get('windows'); if (windowsVersionsToTokens) { for (const [codeVersion, devices] of windowsVersionsToTokens) { const platformDetails = { platform: 'windows', codeVersion }; const deliveryPromise = (async () => { const notification = await prepareWNSNotification({ notifTexts, threadID: threadInfo.id, unreadCount: unreadCounts[userID], platformDetails, }); const deviceTokens = devices.map(({ deviceToken }) => deviceToken); return await sendWNSNotification(notification, deviceTokens, { ...notificationInfo, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } for (const newMessageInfo of remainingNewMessageInfos) { const newDBID = dbIDs.shift(); invariant(newDBID, 'should have sufficient DB IDs'); const messageID = newMessageInfo.id; invariant(messageID, 'RawMessageInfo.id should be set on server'); notifications.set(newDBID, { dbID: newDBID, userID, threadID: newMessageInfo.threadID, messageID, collapseKey: notifInfo.collapseKey, deliveries: [{ collapsedInto: dbID }], }); } } } const cleanUpPromises = []; if (dbIDs.length > 0) { const query = SQL`DELETE FROM ids WHERE id IN (${dbIDs})`; cleanUpPromises.push(dbQuery(query)); } const [deliveryResults] = await Promise.all([ Promise.all(deliveryPromises), Promise.all(cleanUpPromises), ]); await saveNotifResults(deliveryResults, notifications, true); } async function sendRescindNotifs(rescindInfo: PushInfo) { if (Object.keys(rescindInfo).length === 0) { return; } const usersToCollapsableNotifInfo = await fetchCollapsableNotifs(rescindInfo); const promises = []; for (const userID in usersToCollapsableNotifInfo) { for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const existingMessageInfo of notifInfo.existingMessageInfos) { const rescindCondition = SQL` n.user = ${userID} AND n.thread = ${existingMessageInfo.threadID} AND n.message = ${existingMessageInfo.id} `; promises.push(rescindPushNotifs(rescindCondition)); } } } await Promise.all(promises); } // The results in deliveryResults will be combined with the rows // in rowsToSave and then written to the notifications table async function saveNotifResults( deliveryResults: $ReadOnlyArray, inputRowsToSave: Map, rescindable: boolean, ) { const rowsToSave = new Map(inputRowsToSave); const allInvalidTokens = []; for (const deliveryResult of deliveryResults) { const { info, delivery, invalidTokens } = deliveryResult; const { dbID, userID } = info; const curNotifRow = rowsToSave.get(dbID); if (curNotifRow) { curNotifRow.deliveries.push(delivery); } else { // Ternary expressions for Flow const threadID = info.threadID ? info.threadID : null; const messageID = info.messageID ? info.messageID : null; const collapseKey = info.collapseKey ? info.collapseKey : null; rowsToSave.set(dbID, { dbID, userID, threadID, messageID, collapseKey, deliveries: [delivery], }); } if (invalidTokens) { allInvalidTokens.push({ userID, tokens: invalidTokens, }); } } const notificationRows = []; for (const notification of rowsToSave.values()) { notificationRows.push([ notification.dbID, notification.userID, notification.threadID, notification.messageID, notification.collapseKey, JSON.stringify(notification.deliveries), Number(!rescindable), ]); } const dbPromises = []; if (allInvalidTokens.length > 0) { dbPromises.push(removeInvalidTokens(allInvalidTokens)); } if (notificationRows.length > 0) { const query = SQL` INSERT INTO notifications (id, user, thread, message, collapse_key, delivery, rescinded) VALUES ${notificationRows} `; dbPromises.push(dbQuery(query)); } if (dbPromises.length > 0) { await Promise.all(dbPromises); } } async function fetchInfos(pushInfo: PushInfo) { const usersToCollapsableNotifInfo = await fetchCollapsableNotifs(pushInfo); const threadIDs = new Set(); const threadWithChangedNamesToMessages = new Map(); const addThreadIDsFromMessageInfos = (rawMessageInfo: RawMessageInfo) => { const threadID = rawMessageInfo.threadID; threadIDs.add(threadID); const messageSpec = messageSpecs[rawMessageInfo.type]; if (messageSpec.threadIDs) { for (const id of messageSpec.threadIDs(rawMessageInfo)) { threadIDs.add(id); } } if ( rawMessageInfo.type === messageTypes.CHANGE_SETTINGS && rawMessageInfo.field === 'name' ) { const messages = threadWithChangedNamesToMessages.get(threadID); if (messages) { messages.push(rawMessageInfo.id); } else { threadWithChangedNamesToMessages.set(threadID, [rawMessageInfo.id]); } } }; for (const userID in usersToCollapsableNotifInfo) { for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const rawMessageInfo of notifInfo.existingMessageInfos) { addThreadIDsFromMessageInfos(rawMessageInfo); } for (const rawMessageInfo of notifInfo.newMessageInfos) { addThreadIDsFromMessageInfos(rawMessageInfo); } } } const promises = {}; // These threadInfos won't have currentUser set - promises.threadResult = fetchServerThreadInfos( - SQL`t.id IN (${[...threadIDs]})`, - ); + promises.threadResult = fetchServerThreadInfos({ threadIDs }); if (threadWithChangedNamesToMessages.size > 0) { const typesThatAffectName = [ messageTypes.CHANGE_SETTINGS, messageTypes.CREATE_THREAD, ]; const oldNameQuery = SQL` SELECT IF( JSON_TYPE(JSON_EXTRACT(m.content, "$.name")) = 'NULL', "", JSON_UNQUOTE(JSON_EXTRACT(m.content, "$.name")) ) AS name, m.thread FROM ( SELECT MAX(id) AS id FROM messages WHERE type IN (${typesThatAffectName}) AND JSON_EXTRACT(content, "$.name") IS NOT NULL AND`; const threadClauses = []; for (const [threadID, messages] of threadWithChangedNamesToMessages) { threadClauses.push( SQL`(thread = ${threadID} AND id NOT IN (${messages}))`, ); } oldNameQuery.append(mergeOrConditions(threadClauses)); oldNameQuery.append(SQL` GROUP BY thread ) x LEFT JOIN messages m ON m.id = x.id `); promises.oldNames = dbQuery(oldNameQuery); } const { threadResult, oldNames } = await promiseAll(promises); const serverThreadInfos = threadResult.threadInfos; if (oldNames) { const [result] = oldNames; for (const row of result) { const threadID = row.thread.toString(); serverThreadInfos[threadID].name = row.name; } } const userInfos = await fetchNotifUserInfos( serverThreadInfos, usersToCollapsableNotifInfo, ); return { usersToCollapsableNotifInfo, serverThreadInfos, userInfos }; } async function fetchNotifUserInfos( serverThreadInfos: { +[threadID: string]: ServerThreadInfo }, usersToCollapsableNotifInfo: { +[userID: string]: CollapsableNotifInfo[] }, ) { const missingUserIDs = new Set(); for (const threadID in serverThreadInfos) { const serverThreadInfo = serverThreadInfos[threadID]; for (const member of serverThreadInfo.members) { missingUserIDs.add(member.id); } } const addUserIDsFromMessageInfos = (rawMessageInfo: RawMessageInfo) => { missingUserIDs.add(rawMessageInfo.creatorID); const userIDs = messageSpecs[rawMessageInfo.type].userIDs?.(rawMessageInfo) ?? []; for (const userID of userIDs) { missingUserIDs.add(userID); } }; for (const userID in usersToCollapsableNotifInfo) { missingUserIDs.add(userID); for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const rawMessageInfo of notifInfo.existingMessageInfos) { addUserIDsFromMessageInfos(rawMessageInfo); } for (const rawMessageInfo of notifInfo.newMessageInfos) { addUserIDsFromMessageInfos(rawMessageInfo); } } } return await fetchUserInfos([...missingUserIDs]); } async function createDBIDs(pushInfo: PushInfo): Promise { let numIDsNeeded = 0; for (const userID in pushInfo) { numIDsNeeded += pushInfo[userID].messageInfos.length; } return await createIDs('notifications', numIDsNeeded); } function getDevicesByPlatform( devices: $ReadOnlyArray, ): Map>> { const byPlatform = new Map(); for (const device of devices) { let innerMap = byPlatform.get(device.platform); if (!innerMap) { innerMap = new Map(); byPlatform.set(device.platform, innerMap); } const codeVersion: number = device.codeVersion !== null && device.codeVersion !== undefined && device.platform !== 'windows' && device.platform !== 'macos' ? device.codeVersion : -1; let innerMostArray = innerMap.get(codeVersion); if (!innerMostArray) { innerMostArray = []; innerMap.set(codeVersion, innerMostArray); } innerMostArray.push({ cookieID: device.cookieID, deviceToken: device.deviceToken, }); } return byPlatform; } type APNsNotifInputData = { +notifTexts: ResolvedNotifTexts, +newRawMessageInfos: RawMessageInfo[], +threadID: string, +collapseKey: ?string, +badgeOnly: boolean, +unreadCount: number, +platformDetails: PlatformDetails, }; const apnsNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, newRawMessageInfos: t.list(rawMessageInfoValidator), threadID: tID, collapseKey: t.maybe(t.String), badgeOnly: t.Boolean, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareAPNsNotification( inputData: APNsNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, apnsNotifInputDataValidator, inputData, ); const { notifTexts, newRawMessageInfos, threadID, collapseKey, badgeOnly, unreadCount, platformDetails, } = convertedData; const isTextNotification = newRawMessageInfos.every( newRawMessageInfo => newRawMessageInfo.type === messageTypes.TEXT, ); const shouldBeEncrypted = platformDetails.platform === 'ios' && !collapseKey && isTextNotification; const uniqueID = uuidv4(); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic(platformDetails); const { merged, ...rest } = notifTexts; // We don't include alert's body on macos because we // handle displaying the notification ourselves and // we don't want macOS to display it automatically. if (!badgeOnly && platformDetails.platform !== 'macos') { notification.body = merged; notification.sound = 'default'; } notification.payload = { ...notification.payload, ...rest, }; notification.badge = unreadCount; notification.threadId = threadID; notification.id = uniqueID; notification.pushType = 'alert'; notification.payload.id = uniqueID; notification.payload.threadID = threadID; if (platformDetails.codeVersion && platformDetails.codeVersion > 198) { notification.mutableContent = true; } if (collapseKey) { notification.collapseId = collapseKey; } const messageInfos = JSON.stringify(newRawMessageInfos); // We make a copy before checking notification's length, because calling // length compiles the notification and makes it immutable. Further // changes to its properties won't be reflected in the final plaintext // data that is sent. const copyWithMessageInfos = _cloneDeep(notification); copyWithMessageInfos.payload = { ...copyWithMessageInfos.payload, messageInfos, }; const evaluateAndSelectNotifPayload = (notif, notifWithMessageInfos) => { const notifWithMessageInfosCopy = _cloneDeep(notifWithMessageInfos); if ( notifWithMessageInfosCopy.length() <= apnMaxNotificationPayloadByteSize ) { return notifWithMessageInfos; } const notifCopy = _cloneDeep(notif); if (notifCopy.length() > apnMaxNotificationPayloadByteSize) { console.warn( `${platformDetails.platform} notification ${uniqueID} ` + `exceeds size limit, even with messageInfos omitted`, ); } return notif; }; const deviceTokens = devices.map(({ deviceToken }) => deviceToken); if ( shouldBeEncrypted && platformDetails.codeVersion && platformDetails.codeVersion > 222 ) { const cookieIDs = devices.map(({ cookieID }) => cookieID); const [notifications, notificationsWithMessageInfos] = await Promise.all([ prepareEncryptedIOSNotifications(cookieIDs, notification), prepareEncryptedIOSNotifications(cookieIDs, copyWithMessageInfos), ]); return notificationsWithMessageInfos.map((notif, idx) => ({ notification: evaluateAndSelectNotifPayload(notifications[idx], notif), deviceToken: deviceTokens[idx], })); } const notificationToSend = evaluateAndSelectNotifPayload( notification, copyWithMessageInfos, ); return deviceTokens.map(deviceToken => ({ notification: notificationToSend, deviceToken, })); } type AndroidNotifInputData = { ...APNsNotifInputData, +dbID: string, }; const androidNotifInputDataValidator = tShape({ ...apnsNotifInputDataValidator.meta.props, dbID: t.String, }); async function prepareAndroidNotification( inputData: AndroidNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, androidNotifInputDataValidator, inputData, ); const { notifTexts, newRawMessageInfos, threadID, collapseKey, badgeOnly, unreadCount, platformDetails: { codeVersion }, dbID, } = convertedData; const isTextNotification = newRawMessageInfos.every( newRawMessageInfo => newRawMessageInfo.type === messageTypes.TEXT, ); const shouldBeEncrypted = isTextNotification && !collapseKey && codeVersion && codeVersion > 228; const notifID = collapseKey ? collapseKey : dbID; const { merged, ...rest } = notifTexts; const notification = { data: { badge: unreadCount.toString(), ...rest, threadID, }, }; // The reason we only include `badgeOnly` for newer clients is because older // clients don't know how to parse it. The reason we only include `id` for // newer clients is that if the older clients see that field, they assume // the notif has a full payload, and then crash when trying to parse it. // By skipping `id` we allow old clients to still handle in-app notifs and // badge updating. if (!badgeOnly || (codeVersion && codeVersion >= 69)) { notification.data = { ...notification.data, id: notifID, badgeOnly: badgeOnly ? '1' : '0', }; } const messageInfos = JSON.stringify(newRawMessageInfos); const copyWithMessageInfos = { ...notification, data: { ...notification.data, messageInfos }, }; const evaluateAndSelectNotification = (notif, notifWithMessageInfos) => { if ( Buffer.byteLength(JSON.stringify(notifWithMessageInfos)) <= fcmMaxNotificationPayloadByteSize ) { return notifWithMessageInfos; } if ( Buffer.byteLength(JSON.stringify(notif)) > fcmMaxNotificationPayloadByteSize ) { console.warn( `Android notification ${notifID} exceeds size limit, even with messageInfos omitted`, ); } return notif; }; const deviceTokens = devices.map(({ deviceToken }) => deviceToken); if (shouldBeEncrypted) { const cookieIDs = devices.map(({ cookieID }) => cookieID); const [notifications, notificationsWithMessageInfos] = await Promise.all([ prepareEncryptedAndroidNotifications(cookieIDs, notification), prepareEncryptedAndroidNotifications(cookieIDs, copyWithMessageInfos), ]); return notificationsWithMessageInfos.map((notif, idx) => ({ notification: evaluateAndSelectNotification(notifications[idx], notif), deviceToken: deviceTokens[idx], })); } const notificationToSend = evaluateAndSelectNotification( notification, copyWithMessageInfos, ); return deviceTokens.map(deviceToken => ({ notification: notificationToSend, deviceToken, })); } type WebNotifInputData = { +notifTexts: ResolvedNotifTexts, +threadID: string, +unreadCount: number, +platformDetails: PlatformDetails, }; const webNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, threadID: tID, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareWebNotification( inputData: WebNotifInputData, ): Promise { const convertedData = validateOutput( inputData.platformDetails, webNotifInputDataValidator, inputData, ); const { notifTexts, threadID, unreadCount } = convertedData; const id = uuidv4(); const { merged, ...rest } = notifTexts; const notification = { ...rest, unreadCount, id, threadID, }; return notification; } type WNSNotifInputData = { +notifTexts: ResolvedNotifTexts, +threadID: string, +unreadCount: number, +platformDetails: PlatformDetails, }; const wnsNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, threadID: tID, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareWNSNotification( inputData: WNSNotifInputData, ): Promise { const convertedData = validateOutput( inputData.platformDetails, wnsNotifInputDataValidator, inputData, ); const { notifTexts, threadID, unreadCount } = convertedData; const { merged, ...rest } = notifTexts; const notification = { ...rest, unreadCount, threadID, }; if ( Buffer.byteLength(JSON.stringify(notification)) > wnsMaxNotificationPayloadByteSize ) { console.warn('WNS notification exceeds size limit'); } return notification; } type NotificationInfo = | { +source: 'new_message', +dbID: string, +userID: string, +threadID: string, +messageID: string, +collapseKey: ?string, +codeVersion: number, } | { +source: 'mark_as_unread' | 'mark_as_read' | 'activity_update', +dbID: string, +userID: string, +codeVersion: number, }; type APNsDelivery = { source: $PropertyType, deviceType: 'ios' | 'macos', iosID: string, deviceTokens: $ReadOnlyArray, codeVersion: number, errors?: $ReadOnlyArray, }; type APNsResult = { info: NotificationInfo, delivery: APNsDelivery, invalidTokens?: $ReadOnlyArray, }; async function sendAPNsNotification( platform: 'ios' | 'macos', targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion } = notificationInfo; const response = await apnPush({ targetedNotifications, platformDetails: { platform, codeVersion }, }); invariant( new Set(targetedNotifications.map(({ notification }) => notification.id)) .size === 1, 'Encrypted versions of the same notification must share id value', ); const iosID = targetedNotifications[0].notification.id; const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const delivery: APNsDelivery = { source, deviceType: platform, iosID, deviceTokens, codeVersion, }; if (response.errors) { delivery.errors = response.errors; } const result: APNsResult = { info: notificationInfo, delivery, }; if (response.invalidTokens) { result.invalidTokens = response.invalidTokens; } return result; } type PushResult = AndroidResult | APNsResult | WebResult | WNSResult; type PushDelivery = AndroidDelivery | APNsDelivery | WebDelivery | WNSDelivery; type AndroidDelivery = { source: $PropertyType, deviceType: 'android', androidIDs: $ReadOnlyArray, deviceTokens: $ReadOnlyArray, codeVersion: number, errors?: $ReadOnlyArray, }; type AndroidResult = { info: NotificationInfo, delivery: AndroidDelivery, invalidTokens?: $ReadOnlyArray, }; async function sendAndroidNotification( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const collapseKey = notificationInfo.collapseKey ? notificationInfo.collapseKey : null; // for Flow... const { source, codeVersion } = notificationInfo; const response = await fcmPush({ targetedNotifications, collapseKey, codeVersion, }); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const androidIDs = response.fcmIDs ? response.fcmIDs : []; const delivery: AndroidDelivery = { source, deviceType: 'android', androidIDs, deviceTokens, codeVersion, }; if (response.errors) { delivery.errors = response.errors; } const result: AndroidResult = { info: notificationInfo, delivery, }; if (response.invalidTokens) { result.invalidTokens = response.invalidTokens; } return result; } type WebDelivery = { +source: $PropertyType, +deviceType: 'web', +deviceTokens: $ReadOnlyArray, +codeVersion?: number, +errors?: $ReadOnlyArray, }; type WebResult = { +info: NotificationInfo, +delivery: WebDelivery, +invalidTokens?: $ReadOnlyArray, }; async function sendWebNotification( notification: WebNotification, deviceTokens: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion } = notificationInfo; const response = await webPush({ notification, deviceTokens, }); const delivery: WebDelivery = { source, deviceType: 'web', deviceTokens, codeVersion, errors: response.errors, }; const result: WebResult = { info: notificationInfo, delivery, invalidTokens: response.invalidTokens, }; return result; } type WNSDelivery = { +source: $PropertyType, +deviceType: 'windows', +wnsIDs: $ReadOnlyArray, +deviceTokens: $ReadOnlyArray, +codeVersion?: number, +errors?: $ReadOnlyArray, }; type WNSResult = { +info: NotificationInfo, +delivery: WNSDelivery, +invalidTokens?: $ReadOnlyArray, }; async function sendWNSNotification( notification: WNSNotification, deviceTokens: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion } = notificationInfo; const response = await wnsPush({ notification, deviceTokens, }); const wnsIDs = response.wnsIDs ?? []; const delivery: WNSDelivery = { source, deviceType: 'windows', wnsIDs, deviceTokens, codeVersion, errors: response.errors, }; const result: WNSResult = { info: notificationInfo, delivery, invalidTokens: response.invalidTokens, }; return result; } type InvalidToken = { +userID: string, +tokens: $ReadOnlyArray, }; async function removeInvalidTokens( invalidTokens: $ReadOnlyArray, ): Promise { const sqlTuples = invalidTokens.map( invalidTokenUser => SQL`( user = ${invalidTokenUser.userID} AND device_token IN (${invalidTokenUser.tokens}) )`, ); const sqlCondition = mergeOrConditions(sqlTuples); const selectQuery = SQL` SELECT id, user, device_token FROM cookies WHERE `; selectQuery.append(sqlCondition); const [result] = await dbQuery(selectQuery); const userCookiePairsToInvalidDeviceTokens = new Map(); for (const row of result) { const userCookiePair = `${row.user}|${row.id}`; const existing = userCookiePairsToInvalidDeviceTokens.get(userCookiePair); if (existing) { existing.add(row.device_token); } else { userCookiePairsToInvalidDeviceTokens.set( userCookiePair, new Set([row.device_token]), ); } } const time = Date.now(); const promises = []; for (const entry of userCookiePairsToInvalidDeviceTokens) { const [userCookiePair, deviceTokens] = entry; const [userID, cookieID] = userCookiePair.split('|'); const updateDatas = [...deviceTokens].map(deviceToken => ({ type: updateTypes.BAD_DEVICE_TOKEN, userID, time, deviceToken, targetCookie: cookieID, })); promises.push(createUpdates(updateDatas)); } const updateQuery = SQL` UPDATE cookies SET device_token = NULL WHERE `; updateQuery.append(sqlCondition); promises.push(dbQuery(updateQuery)); await Promise.all(promises); } async function updateBadgeCount( viewer: Viewer, source: 'mark_as_unread' | 'mark_as_read' | 'activity_update', ) { const { userID } = viewer; const deviceTokenQuery = SQL` SELECT platform, device_token, versions, id FROM cookies WHERE user = ${userID} AND device_token IS NOT NULL `; if (viewer.data.cookieID) { deviceTokenQuery.append(SQL`AND id != ${viewer.cookieID} `); } const [unreadCounts, [deviceTokenResult], [dbID]] = await Promise.all([ getUnreadCounts([userID]), dbQuery(deviceTokenQuery), createIDs('notifications', 1), ]); const unreadCount = unreadCounts[userID]; const devices = deviceTokenResult.map(row => ({ platform: row.platform, cookieID: row.id, deviceToken: row.device_token, codeVersion: JSON.parse(row.versions)?.codeVersion, })); const byPlatform = getDevicesByPlatform(devices); const deliveryPromises = []; const iosVersionsToTokens = byPlatform.get('ios'); if (iosVersionsToTokens) { for (const [codeVersion, deviceInfos] of iosVersionsToTokens) { const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'ios', codeVersion, }); notification.badge = unreadCount; notification.pushType = 'alert'; const deliveryPromise = (async () => { const cookieIDs = deviceInfos.map(({ cookieID }) => cookieID); let notificationsArray; if (codeVersion > 222) { notificationsArray = await prepareEncryptedIOSNotifications( cookieIDs, notification, ); } else { notificationsArray = cookieIDs.map(() => notification); } const targetedNotifications = deviceInfos.map( ({ deviceToken }, idx) => ({ deviceToken, notification: notificationsArray[idx], }), ); return await sendAPNsNotification('ios', targetedNotifications, { source, dbID, userID, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } const androidVersionsToTokens = byPlatform.get('android'); if (androidVersionsToTokens) { for (const [codeVersion, deviceInfos] of androidVersionsToTokens) { const notificationData = codeVersion < 69 ? { badge: unreadCount.toString() } : { badge: unreadCount.toString(), badgeOnly: '1' }; const notification = { data: notificationData }; const deliveryPromise = (async () => { const cookieIDs = deviceInfos.map(({ cookieID }) => cookieID); let notificationsArray; if (codeVersion > 222) { notificationsArray = await prepareEncryptedAndroidNotifications( cookieIDs, notification, ); } else { notificationsArray = cookieIDs.map(() => notification); } const targetedNotifications = deviceInfos.map( ({ deviceToken }, idx) => ({ deviceToken, notification: notificationsArray[idx], }), ); return await sendAndroidNotification(targetedNotifications, { source, dbID, userID, codeVersion, }); })(); deliveryPromises.push(deliveryPromise); } } const macosVersionsToTokens = byPlatform.get('macos'); if (macosVersionsToTokens) { for (const [codeVersion, deviceInfos] of macosVersionsToTokens) { const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'macos', codeVersion, }); notification.badge = unreadCount; notification.pushType = 'alert'; const targetedNotifications = deviceInfos.map(({ deviceToken }) => ({ deviceToken, notification, })); deliveryPromises.push( sendAPNsNotification('macos', targetedNotifications, { source, dbID, userID, codeVersion, }), ); } } const deliveryResults = await Promise.all(deliveryPromises); await saveNotifResults(deliveryResults, new Map(), false); } export { sendPushNotifs, sendRescindNotifs, updateBadgeCount }; diff --git a/keyserver/src/responders/message-responders.js b/keyserver/src/responders/message-responders.js index 28b61925b..07b814204 100644 --- a/keyserver/src/responders/message-responders.js +++ b/keyserver/src/responders/message-responders.js @@ -1,525 +1,525 @@ // @flow import invariant from 'invariant'; import t, { type TInterface } from 'tcomb'; import { onlyOneEmojiRegex } from 'lib/shared/emojis.js'; import { createMediaMessageData, trimMessage, } from 'lib/shared/message-utils.js'; import { relationshipBlockedInEitherDirection } from 'lib/shared/relationship-utils.js'; import type { Media } from 'lib/types/media-types.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import { type SendTextMessageRequest, type SendMultimediaMessageRequest, type SendReactionMessageRequest, type SendEditMessageRequest, type FetchMessageInfosResponse, type FetchMessageInfosRequest, defaultNumberPerThread, type SendMessageResponse, type SendEditMessageResponse, type FetchPinnedMessagesRequest, type FetchPinnedMessagesResult, messageTruncationStatusesValidator, rawMessageInfoValidator, type SearchMessagesResponse, type SearchMessagesRequest, } from 'lib/types/message-types.js'; import type { EditMessageData } from 'lib/types/messages/edit.js'; import type { ReactionMessageData } from 'lib/types/messages/reaction.js'; import type { TextMessageData } from 'lib/types/messages/text.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { userInfosValidator } from 'lib/types/user-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { tRegex, tShape, tMediaMessageMedia, tID, } from 'lib/utils/validation-utils.js'; import createMessages from '../creators/message-creator.js'; -import { SQL } from '../database/database.js'; import { fetchMessageInfos, fetchMessageInfoForLocalID, fetchMessageInfoByID, fetchThreadMessagesCount, fetchPinnedMessageInfos, searchMessagesInSingleChat, } from '../fetchers/message-fetchers.js'; import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; import { checkThreadPermission } from '../fetchers/thread-permission-fetchers.js'; import { fetchImages, fetchMediaFromMediaMessageContent, } from '../fetchers/upload-fetchers.js'; import { fetchKnownUserInfos } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { assignImages, assignMessageContainerToMedia, } from '../updaters/upload-updaters.js'; import { validateInput, validateOutput } from '../utils/validation-utils.js'; const sendTextMessageRequestInputValidator = tShape({ threadID: tID, localID: t.maybe(t.String), text: t.String, sidebarCreation: t.maybe(t.Boolean), }); export const sendMessageResponseValidator: TInterface = tShape({ newMessageInfo: rawMessageInfoValidator }); async function textMessageCreationResponder( viewer: Viewer, input: mixed, ): Promise { const request = await validateInput( viewer, sendTextMessageRequestInputValidator, input, ); const { threadID, localID, text: rawText, sidebarCreation } = request; const text = trimMessage(rawText); if (!text) { throw new ServerError('invalid_parameters'); } const hasPermission = await checkThreadPermission( viewer, threadID, threadPermissions.VOICED, ); if (!hasPermission) { throw new ServerError('invalid_parameters'); } let messageData: TextMessageData = { type: messageTypes.TEXT, threadID, creatorID: viewer.id, time: Date.now(), text, }; if (localID) { messageData = { ...messageData, localID }; } if (sidebarCreation) { const numMessages = await fetchThreadMessagesCount(threadID); if (numMessages === 2) { // sidebarCreation is set below to prevent double notifs from a sidebar // creation. We expect precisely two messages to appear before a // sidebarCreation: a SIDEBAR_SOURCE and a CREATE_SIDEBAR. If two users // attempt to create a sidebar at the same time, then both clients will // attempt to set sidebarCreation here, but we only want to suppress // notifs for the client that won the race. messageData = { ...messageData, sidebarCreation }; } } const rawMessageInfos = await createMessages(viewer, [messageData]); const response = { newMessageInfo: rawMessageInfos[0] }; return validateOutput( viewer.platformDetails, sendMessageResponseValidator, response, ); } const fetchMessageInfosRequestInputValidator = tShape( { cursors: t.dict(tID, t.maybe(tID)), numberPerThread: t.maybe(t.Number), }, ); export const fetchMessageInfosResponseValidator: TInterface = tShape({ rawMessageInfos: t.list(rawMessageInfoValidator), truncationStatuses: messageTruncationStatusesValidator, userInfos: userInfosValidator, }); async function messageFetchResponder( viewer: Viewer, input: mixed, ): Promise { const request = await validateInput( viewer, fetchMessageInfosRequestInputValidator, input, ); const response = await fetchMessageInfos( viewer, { threadCursors: request.cursors }, request.numberPerThread ? request.numberPerThread : defaultNumberPerThread, ); return validateOutput( viewer.platformDetails, fetchMessageInfosResponseValidator, { ...response, userInfos: {}, }, ); } const sendMultimediaMessageRequestInputValidator = t.union([ // This option is only used for messageTypes.IMAGES tShape({ threadID: tID, localID: t.String, sidebarCreation: t.maybe(t.Boolean), mediaIDs: t.list(tID), }), tShape({ threadID: tID, localID: t.String, sidebarCreation: t.maybe(t.Boolean), mediaMessageContents: t.list(tMediaMessageMedia), }), ]); async function multimediaMessageCreationResponder( viewer: Viewer, input: mixed, ): Promise { const request = await validateInput( viewer, sendMultimediaMessageRequestInputValidator, input, ); if ( (request.mediaIDs && request.mediaIDs.length === 0) || (request.mediaMessageContents && request.mediaMessageContents.length === 0) ) { throw new ServerError('invalid_parameters'); } const { threadID, localID, sidebarCreation } = request; const hasPermission = await checkThreadPermission( viewer, threadID, threadPermissions.VOICED, ); if (!hasPermission) { throw new ServerError('invalid_parameters'); } const existingMessageInfoPromise = fetchMessageInfoForLocalID( viewer, localID, ); const mediaPromise: Promise<$ReadOnlyArray> = request.mediaIDs ? fetchImages(viewer, request.mediaIDs) : fetchMediaFromMediaMessageContent(viewer, request.mediaMessageContents); const [existingMessageInfo, media] = await Promise.all([ existingMessageInfoPromise, mediaPromise, ]); if (media.length === 0 && !existingMessageInfo) { throw new ServerError('invalid_parameters'); } // We use the MULTIMEDIA type for encrypted photos const containsEncryptedMedia = media.some( m => m.type === 'encrypted_photo' || m.type === 'encrypted_video', ); const messageData = createMediaMessageData( { localID, threadID, creatorID: viewer.id, media, sidebarCreation, }, { forceMultimediaMessageType: containsEncryptedMedia }, ); const [newMessageInfo] = await createMessages(viewer, [messageData]); const { id } = newMessageInfo; invariant( id !== null && id !== undefined, 'serverID should be set in createMessages result', ); if (request.mediaIDs) { await assignImages(viewer, request.mediaIDs, id, threadID); } else { await assignMessageContainerToMedia( viewer, request.mediaMessageContents, id, threadID, ); } const response = { newMessageInfo }; return validateOutput( viewer.platformDetails, sendMessageResponseValidator, response, ); } const sendReactionMessageRequestInputValidator = tShape({ threadID: tID, localID: t.maybe(t.String), targetMessageID: tID, reaction: tRegex(onlyOneEmojiRegex), action: t.enums.of(['add_reaction', 'remove_reaction']), }); async function reactionMessageCreationResponder( viewer: Viewer, input: mixed, ): Promise { const request = await validateInput( viewer, sendReactionMessageRequestInputValidator, input, ); const { threadID, localID, targetMessageID, reaction, action } = request; if (!targetMessageID || !reaction) { throw new ServerError('invalid_parameters'); } const targetMessageInfo = await fetchMessageInfoByID(viewer, targetMessageID); if (!targetMessageInfo || !targetMessageInfo.id) { throw new ServerError('invalid_parameters'); } const [serverThreadInfos, hasPermission, targetMessageUserInfos] = await Promise.all([ - fetchServerThreadInfos(SQL`t.id = ${threadID}`), + fetchServerThreadInfos({ threadID }), checkThreadPermission( viewer, threadID, threadPermissions.REACT_TO_MESSAGE, ), fetchKnownUserInfos(viewer, [targetMessageInfo.creatorID]), ]); const targetMessageThreadInfo = serverThreadInfos.threadInfos[threadID]; if (targetMessageThreadInfo.sourceMessageID === targetMessageID) { throw new ServerError('invalid_parameters'); } const targetMessageCreator = targetMessageUserInfos[targetMessageInfo.creatorID]; const targetMessageCreatorRelationship = targetMessageCreator?.relationshipStatus; const creatorRelationshipHasBlock = targetMessageCreatorRelationship && relationshipBlockedInEitherDirection(targetMessageCreatorRelationship); if (!hasPermission || creatorRelationshipHasBlock) { throw new ServerError('invalid_parameters'); } let messageData: ReactionMessageData = { type: messageTypes.REACTION, threadID, creatorID: viewer.id, time: Date.now(), targetMessageID, reaction, action, }; if (localID) { messageData = { ...messageData, localID }; } const rawMessageInfos = await createMessages(viewer, [messageData]); const response = { newMessageInfo: rawMessageInfos[0] }; return validateOutput( viewer.platformDetails, sendMessageResponseValidator, response, ); } const editMessageRequestInputValidator = tShape({ targetMessageID: tID, text: t.String, }); export const sendEditMessageResponseValidator: TInterface = tShape({ newMessageInfos: t.list(rawMessageInfoValidator), }); async function editMessageCreationResponder( viewer: Viewer, input: mixed, ): Promise { const request = await validateInput( viewer, editMessageRequestInputValidator, input, ); const { targetMessageID, text: rawText } = request; const text = trimMessage(rawText); if (!targetMessageID || !text) { throw new ServerError('invalid_parameters'); } const targetMessageInfo = await fetchMessageInfoByID(viewer, targetMessageID); if (!targetMessageInfo || !targetMessageInfo.id) { throw new ServerError('invalid_parameters'); } if (targetMessageInfo.type !== messageTypes.TEXT) { throw new ServerError('invalid_parameters'); } const { threadID } = targetMessageInfo; const [serverThreadInfos, hasPermission, rawSidebarThreadInfos] = await Promise.all([ - fetchServerThreadInfos(SQL`t.id = ${threadID}`), + fetchServerThreadInfos({ threadID }), checkThreadPermission(viewer, threadID, threadPermissions.EDIT_MESSAGE), - fetchServerThreadInfos( - SQL`t.parent_thread_id = ${threadID} AND t.source_message = ${targetMessageID}`, - ), + fetchServerThreadInfos({ + parentThreadID: threadID, + sourceMessageID: targetMessageID, + }), ]); const targetMessageThreadInfo = serverThreadInfos.threadInfos[threadID]; if (targetMessageThreadInfo.sourceMessageID === targetMessageID) { // We are editing first message of the sidebar // If client wants to do that it sends id of the sourceMessage instead throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_parameters'); } if (targetMessageInfo.creatorID !== viewer.id) { throw new ServerError('invalid_parameters'); } const time = Date.now(); const messagesData = []; let messageData: EditMessageData = { type: messageTypes.EDIT_MESSAGE, threadID, creatorID: viewer.id, time, targetMessageID, text, }; messagesData.push(messageData); const sidebarThreadValues = values(rawSidebarThreadInfos.threadInfos); for (const sidebarThreadValue of sidebarThreadValues) { if (sidebarThreadValue && sidebarThreadValue.id) { messageData = { type: messageTypes.EDIT_MESSAGE, threadID: sidebarThreadValue.id, creatorID: viewer.id, time, targetMessageID, text: text, }; messagesData.push(messageData); } } const newMessageInfos = await createMessages(viewer, messagesData); const response = { newMessageInfos }; return validateOutput( viewer.platformDetails, sendEditMessageResponseValidator, response, ); } const fetchPinnedMessagesResponderInputValidator = tShape({ threadID: tID, }); export const fetchPinnedMessagesResultValidator: TInterface = tShape({ pinnedMessages: t.list(rawMessageInfoValidator), }); async function fetchPinnedMessagesResponder( viewer: Viewer, input: mixed, ): Promise { const request = await validateInput( viewer, fetchPinnedMessagesResponderInputValidator, input, ); const response = await fetchPinnedMessageInfos(viewer, request); return validateOutput( viewer.platformDetails, fetchPinnedMessagesResultValidator, response, ); } const searchMessagesResponderInputValidator = tShape({ query: t.String, threadID: t.String, cursor: t.maybe(t.String), }); const searchMessagesResponseValidator: TInterface = tShape({ messages: t.list(rawMessageInfoValidator), endReached: t.Boolean, }); async function searchMessagesResponder( viewer: Viewer, input: mixed, ): Promise { const request: SearchMessagesRequest = await validateInput( viewer, searchMessagesResponderInputValidator, input, ); const response = await searchMessagesInSingleChat( request.query, request.threadID, viewer, request.cursor, ); return validateOutput( viewer.platformDetails, searchMessagesResponseValidator, response, ); } export { textMessageCreationResponder, messageFetchResponder, multimediaMessageCreationResponder, reactionMessageCreationResponder, editMessageCreationResponder, fetchPinnedMessagesResponder, searchMessagesResponder, }; diff --git a/keyserver/src/scripts/add-thread-ancestry.js b/keyserver/src/scripts/add-thread-ancestry.js deleted file mode 100644 index e3101e40b..000000000 --- a/keyserver/src/scripts/add-thread-ancestry.js +++ /dev/null @@ -1,84 +0,0 @@ -// @flow - -import { - getContainingThreadID, - getCommunity, -} from 'lib/shared/thread-utils.js'; -import type { ServerThreadInfo } from 'lib/types/thread-types.js'; - -import { main } from './utils.js'; -import { dbQuery, SQL } from '../database/database.js'; -import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; - -async function addColumnAndIndexes() { - await dbQuery(SQL` - ALTER TABLE threads - ADD containing_thread_id BIGINT(20) NULL AFTER parent_thread_id, - ADD community BIGINT(20) NULL AFTER containing_thread_id, - ADD depth INT UNSIGNED NOT NULL DEFAULT 0 AFTER community, - ADD INDEX parent_thread_id (parent_thread_id), - ADD INDEX community (community), - ADD INDEX containing_thread_id (containing_thread_id); - `); -} - -async function setColumn() { - const stack = [[null, SQL`t.parent_thread_id IS NULL`]]; - - while (stack.length > 0) { - const [parentThreadInfo, predicate] = stack.shift(); - const { threadInfos } = await fetchServerThreadInfos(predicate); - const updatedThreadInfos = await setColumnForLayer( - parentThreadInfo, - threadInfos, - ); - for (const threadInfo of updatedThreadInfos) { - stack.push([threadInfo, SQL`t.parent_thread_id = ${threadInfo.id}`]); - } - } -} - -async function setColumnForLayer( - parentThreadInfo: ?ServerThreadInfo, - threadInfos: { +[id: string]: ServerThreadInfo }, -): Promise { - const updatedThreadInfos = []; - for (const threadID in threadInfos) { - const threadInfo = threadInfos[threadID]; - const containingThreadID = getContainingThreadID( - parentThreadInfo, - threadInfo.type, - ); - const community = getCommunity(parentThreadInfo); - if (!containingThreadID && !community) { - console.log( - `containingThreadID and community are null for ${threadID}, ` + - 'skipping...', - ); - updatedThreadInfos.push(threadInfo); - continue; - } - const depth = parentThreadInfo ? parentThreadInfo.depth + 1 : 0; - console.log( - `setting containingThreadID to ${containingThreadID ?? 'null'}, ` + - `community to ${community ?? 'null'}, and ` + - `depth to ${depth} for ${threadID}`, - ); - await dbQuery(SQL` - UPDATE threads - SET containing_thread_id = ${containingThreadID}, - community = ${community}, - depth = ${depth} - WHERE id = ${threadID} - `); - updatedThreadInfos.push({ - ...threadInfo, - containingThreadID, - community, - depth, - }); - } - return updatedThreadInfos; -} - -main([addColumnAndIndexes, setColumn]); diff --git a/keyserver/src/scripts/soft-launch-migration.js b/keyserver/src/scripts/soft-launch-migration.js deleted file mode 100644 index edbac0f47..000000000 --- a/keyserver/src/scripts/soft-launch-migration.js +++ /dev/null @@ -1,430 +0,0 @@ -// @flow - -import invariant from 'invariant'; - -import ashoat from 'lib/facts/ashoat.js'; -import bots from 'lib/facts/bots.js'; -import genesis from 'lib/facts/genesis.js'; -import testers from 'lib/facts/testers.js'; -import { messageTypes } from 'lib/types/message-types-enum.js'; -import { threadTypes, type ThreadType } from 'lib/types/thread-types-enum.js'; - -import { main } from './utils.js'; -import createMessages from '../creators/message-creator.js'; -import { createThread } from '../creators/thread-creator.js'; -import { dbQuery, SQL } from '../database/database.js'; -import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; -import { fetchAllUserIDs } from '../fetchers/user-fetchers.js'; -import { createScriptViewer } from '../session/scripts.js'; -import type { Viewer } from '../session/viewer.js'; -import { - recalculateThreadPermissions, - commitMembershipChangeset, - saveMemberships, -} from '../updaters/thread-permission-updaters.js'; -import { updateThread } from '../updaters/thread-updaters.js'; - -const batchSize = 10; -const createThreadOptions = { forceAddMembers: true }; -const updateThreadOptions = { - forceUpdateRoot: true, - silenceMessages: true, - ignorePermissions: true, -}; -const convertUnadminnedToCommunities = ['311733', '421638']; -const convertToAnnouncementCommunities = ['375310']; -const convertToAnnouncementSubthreads = ['82649']; -const threadsWithMissingParent = ['534395']; -const personalThreadsWithMissingMembers = [ - '82161', - '103111', - '210609', - '227049', -]; -const excludeFromTestersThread = new Set([ - '1402', - '39227', - '156159', - '526973', - '740732', -]); - -async function createGenesisCommunity() { - const genesisThreadInfos = await fetchServerThreadInfos( - SQL`t.id = ${genesis.id}`, - ); - const genesisThreadInfo = genesisThreadInfos.threadInfos[genesis.id]; - if (genesisThreadInfo && genesisThreadInfo.type === threadTypes.GENESIS) { - return; - } else if (genesisThreadInfo) { - await updateGenesisCommunityType(); - return; - } - - console.log('creating GENESIS community'); - - const idInsertQuery = SQL` - INSERT INTO ids(id, table_name) - VALUES ${[[genesis.id, 'threads']]} - `; - await dbQuery(idInsertQuery); - - const ashoatViewer = createScriptViewer(ashoat.id); - const allUserIDs = await fetchAllUserIDs(); - const nonAshoatUserIDs = allUserIDs.filter(id => id !== ashoat.id); - - await createThread( - ashoatViewer, - { - id: genesis.id, - type: threadTypes.GENESIS, - name: genesis.name, - description: genesis.description, - initialMemberIDs: nonAshoatUserIDs, - }, - createThreadOptions, - ); - - await createMessages( - ashoatViewer, - genesis.introMessages.map(message => ({ - type: messageTypes.TEXT, - threadID: genesis.id, - creatorID: ashoat.id, - time: Date.now(), - text: message, - })), - ); - - console.log('creating testers thread'); - - const testerUserIDs = nonAshoatUserIDs.filter( - userID => !excludeFromTestersThread.has(userID), - ); - const { newThreadID } = await createThread( - ashoatViewer, - { - type: threadTypes.COMMUNITY_SECRET_SUBTHREAD, - name: testers.name, - description: testers.description, - initialMemberIDs: testerUserIDs, - }, - createThreadOptions, - ); - invariant( - newThreadID, - 'newThreadID for tester thread creation should be set', - ); - - await createMessages( - ashoatViewer, - testers.introMessages.map(message => ({ - type: messageTypes.TEXT, - threadID: newThreadID, - creatorID: ashoat.id, - time: Date.now(), - text: message, - })), - ); -} - -async function updateGenesisCommunityType() { - console.log('updating GENESIS community to GENESIS type'); - - const ashoatViewer = createScriptViewer(ashoat.id); - await updateThread( - ashoatViewer, - { - threadID: genesis.id, - changes: { - type: threadTypes.GENESIS, - }, - }, - updateThreadOptions, - ); -} - -async function convertExistingCommunities() { - const communityQuery = SQL` - SELECT t.id, t.name - FROM threads t - LEFT JOIN roles r ON r.thread = t.id - LEFT JOIN memberships m ON m.thread = t.id - WHERE t.type = ${threadTypes.COMMUNITY_SECRET_SUBTHREAD} - AND t.parent_thread_id IS NULL - GROUP BY t.id - HAVING COUNT(DISTINCT r.id) > 1 AND COUNT(DISTINCT m.user) > 2 - `; - const [convertToCommunity] = await dbQuery(communityQuery); - - const botViewer = createScriptViewer(bots.commbot.userID); - await convertThreads( - botViewer, - convertToCommunity, - threadTypes.COMMUNITY_ROOT, - ); -} - -async function convertThreads( - viewer: Viewer, - threads: Array<{ +id: number, +name: string }>, - type: ThreadType, -) { - while (threads.length > 0) { - const batch = threads.splice(0, batchSize); - await Promise.all( - batch.map(async thread => { - console.log(`converting ${JSON.stringify(thread)} to ${type}`); - return await updateThread( - viewer, - { - threadID: thread.id.toString(), - changes: { type }, - }, - updateThreadOptions, - ); - }), - ); - } -} - -async function convertUnadminnedCommunities() { - const communityQuery = SQL` - SELECT id, name - FROM threads - WHERE id IN (${convertUnadminnedToCommunities}) AND - type = ${threadTypes.COMMUNITY_SECRET_SUBTHREAD} - `; - const [convertToCommunity] = await dbQuery(communityQuery); - - // We use ashoat here to make sure he becomes the admin of these communities - const ashoatViewer = createScriptViewer(ashoat.id); - await convertThreads( - ashoatViewer, - convertToCommunity, - threadTypes.COMMUNITY_ROOT, - ); -} - -async function convertAnnouncementCommunities() { - const announcementCommunityQuery = SQL` - SELECT id, name - FROM threads - WHERE id IN (${convertToAnnouncementCommunities}) AND - type != ${threadTypes.COMMUNITY_ANNOUNCEMENT_ROOT} - `; - const [convertToAnnouncementCommunity] = await dbQuery( - announcementCommunityQuery, - ); - - const botViewer = createScriptViewer(bots.commbot.userID); - await convertThreads( - botViewer, - convertToAnnouncementCommunity, - threadTypes.COMMUNITY_ANNOUNCEMENT_ROOT, - ); -} - -async function convertAnnouncementSubthreads() { - const announcementSubthreadQuery = SQL` - SELECT id, name - FROM threads - WHERE id IN (${convertToAnnouncementSubthreads}) AND - type != ${threadTypes.COMMUNITY_OPEN_ANNOUNCEMENT_SUBTHREAD} - `; - const [convertToAnnouncementSubthread] = await dbQuery( - announcementSubthreadQuery, - ); - - const botViewer = createScriptViewer(bots.commbot.userID); - await convertThreads( - botViewer, - convertToAnnouncementSubthread, - threadTypes.COMMUNITY_OPEN_ANNOUNCEMENT_SUBTHREAD, - ); -} - -async function fixThreadsWithMissingParent() { - const threadsWithMissingParentQuery = SQL` - SELECT id, name - FROM threads - WHERE id IN (${threadsWithMissingParent}) AND - type != ${threadTypes.COMMUNITY_SECRET_SUBTHREAD} - `; - const [threadsWithMissingParentResult] = await dbQuery( - threadsWithMissingParentQuery, - ); - - const botViewer = createScriptViewer(bots.commbot.userID); - while (threadsWithMissingParentResult.length > 0) { - const batch = threadsWithMissingParentResult.splice(0, batchSize); - await Promise.all( - batch.map(async thread => { - console.log(`fixing ${JSON.stringify(thread)} with missing parent`); - return await updateThread( - botViewer, - { - threadID: thread.id.toString(), - changes: { - parentThreadID: null, - type: threadTypes.COMMUNITY_SECRET_SUBTHREAD, - }, - }, - updateThreadOptions, - ); - }), - ); - } -} - -async function fixPersonalThreadsWithMissingMembers() { - const missingMembersQuery = SQL` - SELECT thread, user - FROM memberships - WHERE thread IN (${personalThreadsWithMissingMembers}) AND role <= 0 - `; - const [missingMembers] = await dbQuery(missingMembersQuery); - - const botViewer = createScriptViewer(bots.commbot.userID); - for (const row of missingMembers) { - console.log(`fixing ${JSON.stringify(row)} with missing member`); - await updateThread( - botViewer, - { - threadID: row.thread.toString(), - changes: { - newMemberIDs: [row.user.toString()], - }, - }, - updateThreadOptions, - ); - } -} - -async function moveThreadsToGenesis() { - const noParentQuery = SQL` - SELECT id, name - FROM threads - WHERE type != ${threadTypes.COMMUNITY_ROOT} - AND type != ${threadTypes.COMMUNITY_ANNOUNCEMENT_ROOT} - AND type != ${threadTypes.GENESIS} - AND parent_thread_id IS NULL - `; - const [noParentThreads] = await dbQuery(noParentQuery); - - const botViewer = createScriptViewer(bots.commbot.userID); - while (noParentThreads.length > 0) { - const batch = noParentThreads.splice(0, batchSize); - await Promise.all( - batch.map(async thread => { - console.log(`processing ${JSON.stringify(thread)}`); - return await updateThread( - botViewer, - { - threadID: thread.id.toString(), - changes: { - parentThreadID: genesis.id, - }, - }, - updateThreadOptions, - ); - }), - ); - } - - const childQuery = SQL` - SELECT id, name - FROM threads - WHERE type != ${threadTypes.COMMUNITY_ROOT} - AND type != ${threadTypes.COMMUNITY_ANNOUNCEMENT_ROOT} - AND type != ${threadTypes.GENESIS} - AND parent_thread_id IS NOT NULL - AND parent_thread_id != ${genesis.id} - `; - const [childThreads] = await dbQuery(childQuery); - - for (const childThread of childThreads) { - // We go one by one because the changes in a parent thread can affect a - // child thread. If the child thread update starts at the same time as an - // update for its parent thread, a race can cause incorrect results for the - // child thread (in particular for the permissions on the memberships table) - console.log(`processing ${JSON.stringify(childThread)}`); - await updateThread( - botViewer, - { - threadID: childThread.id.toString(), - changes: {}, - }, - updateThreadOptions, - ); - } -} - -async function clearMembershipPermissions() { - const membershipPermissionQuery = SQL` - SELECT DISTINCT thread - FROM memberships - WHERE JSON_EXTRACT(permissions, '$.membership') IS NOT NULL - `; - const [membershipPermissionResult] = await dbQuery(membershipPermissionQuery); - if (membershipPermissionResult.length === 0) { - return; - } - - const botViewer = createScriptViewer(bots.commbot.userID); - for (const row of membershipPermissionResult) { - const threadID = row.thread.toString(); - console.log(`clearing membership permissions for ${threadID}`); - const changeset = await recalculateThreadPermissions(threadID); - await commitMembershipChangeset(botViewer, changeset); - } - - console.log('clearing -1 rows...'); - const emptyMembershipDeletionQuery = SQL` - DELETE FROM memberships - WHERE role = -1 AND permissions IS NULL - `; - await dbQuery(emptyMembershipDeletionQuery); - - await createMembershipsForFormerMembers(); -} - -async function createMembershipsForFormerMembers() { - const [result] = await dbQuery(SQL` - SELECT DISTINCT thread, user - FROM messages m - WHERE NOT EXISTS ( - SELECT thread, user FROM memberships mm - WHERE m.thread = mm.thread AND m.user = mm.user - ) - `); - - const rowsToSave = []; - for (const row of result) { - rowsToSave.push({ - operation: 'save', - userID: row.user.toString(), - threadID: row.thread.toString(), - userNeedsFullThreadDetails: false, - intent: 'none', - permissions: null, - permissionsForChildren: null, - role: '-1', - oldRole: '-1', - }); - } - - await saveMemberships(rowsToSave); -} - -main([ - createGenesisCommunity, - convertExistingCommunities, - convertUnadminnedCommunities, - convertAnnouncementCommunities, - convertAnnouncementSubthreads, - fixThreadsWithMissingParent, - fixPersonalThreadsWithMissingMembers, - moveThreadsToGenesis, - clearMembershipPermissions, -]); diff --git a/keyserver/src/socket/session-utils.js b/keyserver/src/socket/session-utils.js index 69f79c632..33aad02ef 100644 --- a/keyserver/src/socket/session-utils.js +++ b/keyserver/src/socket/session-utils.js @@ -1,583 +1,581 @@ // @flow import invariant from 'invariant'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import { serverEntryInfo, serverEntryInfosObject, } from 'lib/shared/entry-utils.js'; import type { UpdateActivityResult } from 'lib/types/activity-types.js'; import type { IdentityKeysBlob } from 'lib/types/crypto-types.js'; import { isDeviceType } from 'lib/types/device-types.js'; import type { CalendarQuery, DeltaEntryInfosResponse, } from 'lib/types/entry-types.js'; import { reportTypes, type ThreadInconsistencyReportCreationRequest, type EntryInconsistencyReportCreationRequest, } from 'lib/types/report-types.js'; import { serverRequestTypes, type ThreadInconsistencyClientResponse, type EntryInconsistencyClientResponse, type ClientResponse, type ServerServerRequest, type ServerCheckStateServerRequest, } from 'lib/types/request-types.js'; import { sessionCheckFrequency } from 'lib/types/session-types.js'; import { signedIdentityKeysBlobValidator } from 'lib/utils/crypto-utils.js'; import { hash } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import { tShape, tPlatform, tPlatformDetails, } from 'lib/utils/validation-utils.js'; import { createOlmSession } from '../creators/olm-session-creator.js'; import { saveOneTimeKeys } from '../creators/one-time-keys-creator.js'; import createReport from '../creators/report-creator.js'; -import { SQL } from '../database/database.js'; import { fetchEntryInfos, fetchEntryInfosByID, fetchEntriesForSession, } from '../fetchers/entry-fetchers.js'; import { checkIfSessionHasEnoughOneTimeKeys } from '../fetchers/key-fetchers.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchCurrentUserInfo, fetchKnownUserInfos, } from '../fetchers/user-fetchers.js'; import { activityUpdatesInputValidator } from '../responders/activity-responders.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { threadInconsistencyReportValidatorShape, entryInconsistencyReportValidatorShape, } from '../responders/report-responders.js'; import { setNewSession, setCookiePlatform, setCookiePlatformDetails, setCookieSignedIdentityKeysBlob, } from '../session/cookies.js'; import type { Viewer } from '../session/viewer.js'; import { activityUpdater } from '../updaters/activity-updaters.js'; import { compareNewCalendarQuery } from '../updaters/entry-updaters.js'; import type { SessionUpdate } from '../updaters/session-updaters.js'; import { getOlmUtility } from '../utils/olm-utils.js'; const clientResponseInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'serverRequestTypes.PLATFORM', x => x === serverRequestTypes.PLATFORM, ), platform: tPlatform, }), tShape({ ...threadInconsistencyReportValidatorShape, type: t.irreducible( 'serverRequestTypes.THREAD_INCONSISTENCY', x => x === serverRequestTypes.THREAD_INCONSISTENCY, ), }), tShape({ ...entryInconsistencyReportValidatorShape, type: t.irreducible( 'serverRequestTypes.ENTRY_INCONSISTENCY', x => x === serverRequestTypes.ENTRY_INCONSISTENCY, ), }), tShape({ type: t.irreducible( 'serverRequestTypes.PLATFORM_DETAILS', x => x === serverRequestTypes.PLATFORM_DETAILS, ), platformDetails: tPlatformDetails, }), tShape({ type: t.irreducible( 'serverRequestTypes.CHECK_STATE', x => x === serverRequestTypes.CHECK_STATE, ), hashResults: t.dict(t.String, t.Boolean), }), tShape({ type: t.irreducible( 'serverRequestTypes.INITIAL_ACTIVITY_UPDATES', x => x === serverRequestTypes.INITIAL_ACTIVITY_UPDATES, ), activityUpdates: activityUpdatesInputValidator, }), tShape({ type: t.irreducible( 'serverRequestTypes.MORE_ONE_TIME_KEYS', x => x === serverRequestTypes.MORE_ONE_TIME_KEYS, ), keys: t.list(t.String), }), tShape({ type: t.irreducible( 'serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB', x => x === serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, ), signedIdentityKeysBlob: signedIdentityKeysBlobValidator, }), tShape({ type: t.irreducible( 'serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE', x => x === serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, ), initialNotificationsEncryptedMessage: t.String, }), ]); type StateCheckStatus = | { status: 'state_validated' } | { status: 'state_invalid', invalidKeys: $ReadOnlyArray } | { status: 'state_check' }; type ProcessClientResponsesResult = { serverRequests: ServerServerRequest[], stateCheckStatus: ?StateCheckStatus, activityUpdateResult: ?UpdateActivityResult, }; async function processClientResponses( viewer: Viewer, clientResponses: $ReadOnlyArray, ): Promise { let viewerMissingPlatform = !viewer.platform; const { platformDetails } = viewer; let viewerMissingPlatformDetails = !platformDetails || (isDeviceType(viewer.platform) && (platformDetails.codeVersion === null || platformDetails.codeVersion === undefined || platformDetails.stateVersion === null || platformDetails.stateVersion === undefined)); const promises = []; let activityUpdates = []; let stateCheckStatus = null; const clientSentPlatformDetails = clientResponses.some( response => response.type === serverRequestTypes.PLATFORM_DETAILS, ); for (const clientResponse of clientResponses) { if ( clientResponse.type === serverRequestTypes.PLATFORM && !clientSentPlatformDetails ) { promises.push(setCookiePlatform(viewer, clientResponse.platform)); viewerMissingPlatform = false; if (!isDeviceType(clientResponse.platform)) { viewerMissingPlatformDetails = false; } } else if ( clientResponse.type === serverRequestTypes.THREAD_INCONSISTENCY ) { promises.push(recordThreadInconsistency(viewer, clientResponse)); } else if (clientResponse.type === serverRequestTypes.ENTRY_INCONSISTENCY) { promises.push(recordEntryInconsistency(viewer, clientResponse)); } else if (clientResponse.type === serverRequestTypes.PLATFORM_DETAILS) { promises.push( setCookiePlatformDetails(viewer, clientResponse.platformDetails), ); viewerMissingPlatform = false; viewerMissingPlatformDetails = false; } else if ( clientResponse.type === serverRequestTypes.INITIAL_ACTIVITY_UPDATES ) { activityUpdates = [...activityUpdates, ...clientResponse.activityUpdates]; } else if (clientResponse.type === serverRequestTypes.CHECK_STATE) { const invalidKeys = []; for (const key in clientResponse.hashResults) { const result = clientResponse.hashResults[key]; if (!result) { invalidKeys.push(key); } } stateCheckStatus = invalidKeys.length > 0 ? { status: 'state_invalid', invalidKeys } : { status: 'state_validated' }; } else if (clientResponse.type === serverRequestTypes.MORE_ONE_TIME_KEYS) { invariant(clientResponse.keys, 'keys expected in client response'); handleAsyncPromise(saveOneTimeKeys(viewer, clientResponse.keys)); } else if ( clientResponse.type === serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB ) { invariant( clientResponse.signedIdentityKeysBlob, 'signedIdentityKeysBlob expected in client response', ); const { signedIdentityKeysBlob } = clientResponse; const identityKeys: IdentityKeysBlob = JSON.parse( signedIdentityKeysBlob.payload, ); const olmUtil = getOlmUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); handleAsyncPromise( setCookieSignedIdentityKeysBlob( viewer.cookieID, signedIdentityKeysBlob, ), ); } catch (e) { continue; } } else if ( clientResponse.type === serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE ) { invariant( t.String.is(clientResponse.initialNotificationsEncryptedMessage), 'initialNotificationsEncryptedMessage expected in client response', ); const { initialNotificationsEncryptedMessage } = clientResponse; try { await createOlmSession( initialNotificationsEncryptedMessage, 'notifications', viewer.cookieID, ); } catch (e) { continue; } } } const activityUpdatePromise = (async () => { if (activityUpdates.length === 0) { return undefined; } return await activityUpdater(viewer, { updates: activityUpdates }); })(); const serverRequests = []; const checkOneTimeKeysPromise = (async () => { if (!viewer.loggedIn) { return; } const enoughOneTimeKeys = await checkIfSessionHasEnoughOneTimeKeys( viewer.session, ); if (!enoughOneTimeKeys) { serverRequests.push({ type: serverRequestTypes.MORE_ONE_TIME_KEYS }); } })(); const { activityUpdateResult } = await promiseAll({ all: Promise.all(promises), activityUpdateResult: activityUpdatePromise, checkOneTimeKeysPromise, }); if ( !stateCheckStatus && viewer.loggedIn && viewer.sessionLastValidated + sessionCheckFrequency < Date.now() ) { stateCheckStatus = { status: 'state_check' }; } if (viewerMissingPlatform) { serverRequests.push({ type: serverRequestTypes.PLATFORM }); } if (viewerMissingPlatformDetails) { serverRequests.push({ type: serverRequestTypes.PLATFORM_DETAILS }); } return { serverRequests, stateCheckStatus, activityUpdateResult }; } async function recordThreadInconsistency( viewer: Viewer, response: ThreadInconsistencyClientResponse, ): Promise { const { type, ...rest } = response; const reportCreationRequest = ({ ...rest, type: reportTypes.THREAD_INCONSISTENCY, }: ThreadInconsistencyReportCreationRequest); await createReport(viewer, reportCreationRequest); } async function recordEntryInconsistency( viewer: Viewer, response: EntryInconsistencyClientResponse, ): Promise { const { type, ...rest } = response; const reportCreationRequest = ({ ...rest, type: reportTypes.ENTRY_INCONSISTENCY, }: EntryInconsistencyReportCreationRequest); await createReport(viewer, reportCreationRequest); } type SessionInitializationResult = | { sessionContinued: false } | { sessionContinued: true, deltaEntryInfoResult: DeltaEntryInfosResponse, sessionUpdate: SessionUpdate, }; async function initializeSession( viewer: Viewer, calendarQuery: CalendarQuery, oldLastUpdate: number, ): Promise { if (!viewer.loggedIn) { return { sessionContinued: false }; } if (!viewer.hasSessionInfo) { // If the viewer has no session info but is logged in, that is indicative // of an expired / invalidated session and we should generate a new one await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } if (oldLastUpdate < viewer.sessionLastUpdated) { // If the client has an older last_update than the server is tracking for // that client, then the client either had some issue persisting its store, // or the user restored the client app from a backup. Either way, we should // invalidate the existing session, since the server has assumed that the // checkpoint is further along than it is on the client, and might not still // have all of the updates necessary to do an incremental update await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } let comparisonResult = null; try { comparisonResult = compareNewCalendarQuery(viewer, calendarQuery); } catch (e) { if (e.message !== 'unknown_error') { throw e; } } if (comparisonResult) { const { difference, oldCalendarQuery } = comparisonResult; const sessionUpdate = { ...comparisonResult.sessionUpdate, lastUpdate: oldLastUpdate, }; const deltaEntryInfoResult = await fetchEntriesForSession( viewer, difference, oldCalendarQuery, ); return { sessionContinued: true, deltaEntryInfoResult, sessionUpdate }; } else { await setNewSession(viewer, calendarQuery, oldLastUpdate); return { sessionContinued: false }; } } type StateCheckResult = { sessionUpdate?: SessionUpdate, checkStateRequest?: ServerCheckStateServerRequest, }; async function checkState( viewer: Viewer, status: StateCheckStatus, calendarQuery: CalendarQuery, ): Promise { if (status.status === 'state_validated') { return { sessionUpdate: { lastValidated: Date.now() } }; } else if (status.status === 'state_check') { const promises = { threadsResult: fetchThreadInfos(viewer), entriesResult: fetchEntryInfos(viewer, [calendarQuery]), currentUserInfo: fetchCurrentUserInfo(viewer), userInfosResult: fetchKnownUserInfos(viewer), }; const fetchedData = await promiseAll(promises); const hashesToCheck = { threadInfos: hash(fetchedData.threadsResult.threadInfos), entryInfos: hash( serverEntryInfosObject(fetchedData.entriesResult.rawEntryInfos), ), currentUserInfo: hash(fetchedData.currentUserInfo), userInfos: hash(fetchedData.userInfosResult), }; const checkStateRequest = { type: serverRequestTypes.CHECK_STATE, hashesToCheck, }; return { checkStateRequest }; } const { invalidKeys } = status; let fetchAllThreads = false, fetchAllEntries = false, fetchAllUserInfos = false, fetchUserInfo = false; - const threadIDsToFetch = [], + const threadIDsToFetch = new Set(), entryIDsToFetch = [], userIDsToFetch = []; for (const key of invalidKeys) { if (key === 'threadInfos') { fetchAllThreads = true; } else if (key === 'entryInfos') { fetchAllEntries = true; } else if (key === 'userInfos') { fetchAllUserInfos = true; } else if (key === 'currentUserInfo') { fetchUserInfo = true; } else if (key.startsWith('threadInfo|')) { const [, threadID] = key.split('|'); - threadIDsToFetch.push(threadID); + threadIDsToFetch.add(threadID); } else if (key.startsWith('entryInfo|')) { const [, entryID] = key.split('|'); entryIDsToFetch.push(entryID); } else if (key.startsWith('userInfo|')) { const [, userID] = key.split('|'); userIDsToFetch.push(userID); } } const fetchPromises = {}; if (fetchAllThreads) { fetchPromises.threadsResult = fetchThreadInfos(viewer); - } else if (threadIDsToFetch.length > 0) { - fetchPromises.threadsResult = fetchThreadInfos( - viewer, - SQL`t.id IN (${threadIDsToFetch})`, - ); + } else if (threadIDsToFetch.size > 0) { + fetchPromises.threadsResult = fetchThreadInfos(viewer, { + threadIDs: threadIDsToFetch, + }); } if (fetchAllEntries) { fetchPromises.entriesResult = fetchEntryInfos(viewer, [calendarQuery]); } else if (entryIDsToFetch.length > 0) { fetchPromises.entryInfos = fetchEntryInfosByID(viewer, entryIDsToFetch); } if (fetchAllUserInfos) { fetchPromises.userInfos = fetchKnownUserInfos(viewer); } else if (userIDsToFetch.length > 0) { fetchPromises.userInfos = fetchKnownUserInfos(viewer, userIDsToFetch); } if (fetchUserInfo) { fetchPromises.currentUserInfo = fetchCurrentUserInfo(viewer); } const fetchedData = await promiseAll(fetchPromises); const hashesToCheck = {}, failUnmentioned = {}, stateChanges = {}; for (const key of invalidKeys) { if (key === 'threadInfos') { // Instead of returning all threadInfos, we want to narrow down and figure // out which threadInfos don't match first const { threadInfos } = fetchedData.threadsResult; for (const threadID in threadInfos) { hashesToCheck[`threadInfo|${threadID}`] = hash(threadInfos[threadID]); } failUnmentioned.threadInfos = true; } else if (key === 'entryInfos') { // Instead of returning all entryInfos, we want to narrow down and figure // out which entryInfos don't match first const { rawEntryInfos } = fetchedData.entriesResult; for (const rawEntryInfo of rawEntryInfos) { const entryInfo = serverEntryInfo(rawEntryInfo); invariant(entryInfo, 'should be set'); const { id: entryID } = entryInfo; invariant(entryID, 'should be set'); hashesToCheck[`entryInfo|${entryID}`] = hash(entryInfo); } failUnmentioned.entryInfos = true; } else if (key === 'userInfos') { // Instead of returning all userInfos, we want to narrow down and figure // out which userInfos don't match first const { userInfos } = fetchedData; for (const userID in userInfos) { hashesToCheck[`userInfo|${userID}`] = hash(userInfos[userID]); } failUnmentioned.userInfos = true; } else if (key === 'currentUserInfo') { stateChanges.currentUserInfo = fetchedData.currentUserInfo; } else if (key.startsWith('threadInfo|')) { const [, threadID] = key.split('|'); const { threadInfos } = fetchedData.threadsResult; const threadInfo = threadInfos[threadID]; if (!threadInfo) { if (!stateChanges.deleteThreadIDs) { stateChanges.deleteThreadIDs = []; } stateChanges.deleteThreadIDs.push(threadID); continue; } if (!stateChanges.rawThreadInfos) { stateChanges.rawThreadInfos = []; } stateChanges.rawThreadInfos.push(threadInfo); } else if (key.startsWith('entryInfo|')) { const [, entryID] = key.split('|'); const rawEntryInfos = fetchedData.entriesResult ? fetchedData.entriesResult.rawEntryInfos : fetchedData.entryInfos; const entryInfo = rawEntryInfos.find( candidate => candidate.id === entryID, ); if (!entryInfo) { if (!stateChanges.deleteEntryIDs) { stateChanges.deleteEntryIDs = []; } stateChanges.deleteEntryIDs.push(entryID); continue; } if (!stateChanges.rawEntryInfos) { stateChanges.rawEntryInfos = []; } stateChanges.rawEntryInfos.push(entryInfo); } else if (key.startsWith('userInfo|')) { const { userInfos: fetchedUserInfos } = fetchedData; const [, userID] = key.split('|'); const userInfo = fetchedUserInfos[userID]; if (!userInfo || !userInfo.username) { if (!stateChanges.deleteUserInfoIDs) { stateChanges.deleteUserInfoIDs = []; } stateChanges.deleteUserInfoIDs.push(userID); } else { if (!stateChanges.userInfos) { stateChanges.userInfos = []; } stateChanges.userInfos.push({ ...userInfo, // Flow gets confused if we don't do this username: userInfo.username, }); } } } const checkStateRequest = { type: serverRequestTypes.CHECK_STATE, hashesToCheck, failUnmentioned, stateChanges, }; if (Object.keys(hashesToCheck).length === 0) { return { checkStateRequest, sessionUpdate: { lastValidated: Date.now() } }; } else { return { checkStateRequest }; } } export { clientResponseInputValidator, processClientResponses, initializeSession, checkState, }; diff --git a/keyserver/src/updaters/thread-permission-updaters.js b/keyserver/src/updaters/thread-permission-updaters.js index 89eefb697..829ae19de 100644 --- a/keyserver/src/updaters/thread-permission-updaters.js +++ b/keyserver/src/updaters/thread-permission-updaters.js @@ -1,1288 +1,1288 @@ // @flow import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; import bots from 'lib/facts/bots.js'; import genesis from 'lib/facts/genesis.js'; import { makePermissionsBlob, makePermissionsForChildrenBlob, getRoleForPermissions, } from 'lib/permissions/thread-permissions.js'; import type { CalendarQuery } from 'lib/types/entry-types.js'; import type { ThreadPermissionsBlob, ThreadRolePermissionsBlob, } from 'lib/types/thread-permission-types.js'; import { type ThreadType, assertThreadType, } from 'lib/types/thread-types-enum.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { type ServerUpdateInfo, type CreateUpdatesResult, } from 'lib/types/update-types.js'; import { pushAll } from 'lib/utils/array.js'; import { ServerError } from 'lib/utils/errors.js'; import { updateChangedUndirectedRelationships } from './relationship-updaters.js'; import { createUpdates, type UpdatesForCurrentSession, } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchServerThreadInfos, rawThreadInfosFromServerThreadInfos, } from '../fetchers/thread-fetchers.js'; import { rescindPushNotifs } from '../push/rescind.js'; import { createScriptViewer } from '../session/scripts.js'; import type { Viewer } from '../session/viewer.js'; import { updateRoles } from '../updaters/role-updaters.js'; import DepthQueue from '../utils/depth-queue.js'; import RelationshipChangeset from '../utils/relationship-changeset.js'; export type MembershipRowToSave = { +operation: 'save', +intent: 'join' | 'leave' | 'none', +userID: string, +threadID: string, +userNeedsFullThreadDetails: boolean, +permissions: ?ThreadPermissionsBlob, +permissionsForChildren: ?ThreadPermissionsBlob, // null role represents by "0" +role: string, +oldRole: string, +unread?: boolean, }; type MembershipRowToDelete = { +operation: 'delete', +intent: 'join' | 'leave' | 'none', +userID: string, +threadID: string, +oldRole: string, }; type MembershipRow = MembershipRowToSave | MembershipRowToDelete; type Changeset = { +membershipRows: MembershipRow[], +relationshipChangeset: RelationshipChangeset, }; // 0 role means to remove the user from the thread // null role means to set the user to the default role // string role means to set the user to the role with that ID // -1 role means to set the user as a "ghost" (former member) type ChangeRoleOptions = { +setNewMembersToUnread?: boolean, }; type ChangeRoleMemberInfo = { permissionsFromParent?: ?ThreadPermissionsBlob, memberOfContainingThread?: boolean, }; async function changeRole( threadID: string, userIDs: $ReadOnlyArray, role: string | -1 | 0 | null, options?: ChangeRoleOptions, ): Promise { const intent = role === -1 || role === 0 ? 'leave' : 'join'; const setNewMembersToUnread = options?.setNewMembersToUnread && intent === 'join'; if (userIDs.length === 0) { return { membershipRows: [], relationshipChangeset: new RelationshipChangeset(), }; } const membershipQuery = SQL` SELECT user, role, permissions, permissions_for_children FROM memberships WHERE thread = ${threadID} `; const parentMembershipQuery = SQL` SELECT pm.user, pm.permissions_for_children AS permissions_from_parent FROM threads t INNER JOIN memberships pm ON pm.thread = t.parent_thread_id WHERE t.id = ${threadID} AND (pm.user IN (${userIDs}) OR t.parent_thread_id != ${genesis.id}) `; const containingMembershipQuery = SQL` SELECT cm.user, cm.role AS containing_role FROM threads t INNER JOIN memberships cm ON cm.thread = t.containing_thread_id WHERE t.id = ${threadID} AND cm.user IN (${userIDs}) `; const [ [membershipResults], [parentMembershipResults], containingMembershipResults, roleThreadResult, ] = await Promise.all([ dbQuery(membershipQuery), dbQuery(parentMembershipQuery), (async () => { if (intent === 'leave') { // Membership in the container only needs to be checked for members return []; } const [result] = await dbQuery(containingMembershipQuery); return result; })(), changeRoleThreadQuery(threadID, role), ]); const { roleColumnValue: intendedRole, threadType, parentThreadID, hasContainingThreadID, rolePermissions: intendedRolePermissions, depth, } = roleThreadResult; const existingMembershipInfo = new Map(); for (const row of membershipResults) { const userID = row.user.toString(); existingMembershipInfo.set(userID, { oldRole: row.role.toString(), oldPermissions: JSON.parse(row.permissions), oldPermissionsForChildren: JSON.parse(row.permissions_for_children), }); } const ancestorMembershipInfo: Map = new Map(); for (const row of parentMembershipResults) { const userID = row.user.toString(); if (!userIDs.includes(userID)) { continue; } ancestorMembershipInfo.set(userID, { permissionsFromParent: JSON.parse(row.permissions_from_parent), }); } for (const row of containingMembershipResults) { const userID = row.user.toString(); const ancestorMembership = ancestorMembershipInfo.get(userID); const memberOfContainingThread = row.containing_role > 0; if (ancestorMembership) { ancestorMembership.memberOfContainingThread = memberOfContainingThread; } else { ancestorMembershipInfo.set(userID, { memberOfContainingThread, }); } } const relationshipChangeset = new RelationshipChangeset(); const existingMemberIDs = [...existingMembershipInfo.keys()]; if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const parentMemberIDs = parentMembershipResults.map(row => row.user.toString(), ); if (parentThreadID && parentThreadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(parentMemberIDs); } const membershipRows = []; const toUpdateDescendants = new Map(); for (const userID of userIDs) { const existingMembership = existingMembershipInfo.get(userID); const oldRole = existingMembership?.oldRole ?? '-1'; const oldPermissions = existingMembership?.oldPermissions ?? null; const oldPermissionsForChildren = existingMembership?.oldPermissionsForChildren ?? null; if (existingMembership && oldRole === intendedRole) { // If the old role is the same as the new one, we have nothing to update continue; } else if (Number(oldRole) > 0 && role === null) { // In the case where we're just trying to add somebody to a thread, if // they already have a role with a nonzero role then we don't need to do // anything continue; } let permissionsFromParent = null; let memberOfContainingThread = false; const ancestorMembership = ancestorMembershipInfo.get(userID); if (ancestorMembership) { permissionsFromParent = ancestorMembership.permissionsFromParent; memberOfContainingThread = ancestorMembership.memberOfContainingThread; } if (!hasContainingThreadID) { memberOfContainingThread = true; } const rolePermissions = memberOfContainingThread ? intendedRolePermissions : null; const targetRole = memberOfContainingThread ? intendedRole : '-1'; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userBecameMember = Number(oldRole) <= 0 && Number(newRole) > 0; const userLostMembership = Number(oldRole) > 0 && Number(newRole) <= 0; if ( (intent === 'join' && Number(newRole) <= 0) || (intent === 'leave' && Number(newRole) > 0) ) { throw new ServerError('invalid_parameters'); } else if (intendedRole !== newRole) { console.warn( `changeRole called for role=${intendedRole}, but ended up setting ` + `role=${newRole} for userID ${userID} and threadID ${threadID}, ` + 'probably because KNOW_OF permission was unexpectedly present or ' + 'missing', ); } if ( existingMembership && _isEqual(permissions)(oldPermissions) && oldRole === newRole ) { // This thread and all of its descendants need no updates for this user, // since the corresponding memberships row is unchanged by this operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent, userID, threadID, userNeedsFullThreadDetails: userBecameMember, permissions, permissionsForChildren, role: newRole, oldRole, unread: userBecameMember && setNewMembersToUnread, }); } else { membershipRows.push({ operation: 'delete', intent, userID, threadID, oldRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { relationshipChangeset.setRelationshipsNeeded(userID, existingMemberIDs); } if ( userLostMembership || !_isEqual(permissionsForChildren)(oldPermissionsForChildren) ) { toUpdateDescendants.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (toUpdateDescendants.size > 0) { const { membershipRows: descendantMembershipRows, relationshipChangeset: descendantRelationshipChangeset, } = await updateDescendantPermissions({ threadID, depth, changesByUser: toUpdateDescendants, }); pushAll(membershipRows, descendantMembershipRows); relationshipChangeset.addAll(descendantRelationshipChangeset); } return { membershipRows, relationshipChangeset }; } type RoleThreadResult = { +roleColumnValue: string, +depth: number, +threadType: ThreadType, +parentThreadID: ?string, +hasContainingThreadID: boolean, +rolePermissions: ?ThreadRolePermissionsBlob, }; async function changeRoleThreadQuery( threadID: string, role: string | -1 | 0 | null, ): Promise { if (role === 0 || role === -1) { const query = SQL` SELECT type, depth, parent_thread_id, containing_thread_id FROM threads WHERE id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: role.toString(), depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: null, }; } else if (role !== null) { const query = SQL` SELECT t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions FROM threads t INNER JOIN roles r ON r.thread = t.id AND r.id = ${role} WHERE t.id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: role, depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: JSON.parse(row.permissions), }; } else { const query = SQL` SELECT t.type, t.depth, t.parent_thread_id, t.containing_thread_id, t.default_role, r.permissions FROM threads t INNER JOIN roles r ON r.thread = t.id AND r.id = t.default_role WHERE t.id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: row.default_role.toString(), depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: JSON.parse(row.permissions), }; } } type ChangedAncestor = { +threadID: string, +depth: number, +changesByUser: Map, }; type AncestorChanges = { +userIsMember: boolean, +permissionsForChildren: ?ThreadPermissionsBlob, }; async function updateDescendantPermissions( initialChangedAncestor: ChangedAncestor, ): Promise { const membershipRows = []; const relationshipChangeset = new RelationshipChangeset(); const initialDescendants = await fetchDescendantsForUpdate([ initialChangedAncestor, ]); const depthQueue = new DepthQueue( getDescendantDepth, getDescendantKey, mergeDescendants, ); depthQueue.addInfos(initialDescendants); let descendants; while ((descendants = depthQueue.getNextDepth())) { const descendantsAsAncestors = []; for (const descendant of descendants) { const { threadID, threadType, depth, users } = descendant; const existingMembers = [...users.entries()]; const existingMemberIDs = existingMembers .filter(([, { curRole }]) => curRole) .map(([userID]) => userID); if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const usersForNextLayer = new Map(); for (const [userID, user] of users) { const { curRolePermissions, curPermissionsFromParent, curMemberOfContainingThread, nextMemberOfContainingThread, nextPermissionsFromParent, potentiallyNeedsUpdate, } = user; const existingMembership = !!user.curRole; const curRole = user.curRole ?? '-1'; const curPermissions = user.curPermissions ?? null; const curPermissionsForChildren = user.curPermissionsForChildren ?? null; if (!potentiallyNeedsUpdate) { continue; } const permissionsFromParent = nextPermissionsFromParent === undefined ? curPermissionsFromParent : nextPermissionsFromParent; const memberOfContainingThread = nextMemberOfContainingThread === undefined ? curMemberOfContainingThread : nextMemberOfContainingThread; const targetRole = memberOfContainingThread ? curRole : '-1'; const rolePermissions = memberOfContainingThread ? curRolePermissions : null; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userLostMembership = Number(curRole) > 0 && Number(newRole) <= 0; if (_isEqual(permissions)(curPermissions) && curRole === newRole) { // This thread and all of its descendants need no updates for this // user, since the corresponding memberships row is unchanged by this // operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent: 'none', userID, threadID, userNeedsFullThreadDetails: false, permissions, permissionsForChildren, role: newRole, oldRole: curRole, }); } else { membershipRows.push({ operation: 'delete', intent: 'none', userID, threadID, oldRole: curRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { // If there was no membership row before, and we are creating one, // we'll need to make sure the new member has a relationship row with // each existing member. We expect that whoever called us already // generated memberships row for the new members, will will lead // saveMemberships to generate relationships rows between those new // users. relationshipChangeset.setRelationshipsNeeded( userID, existingMemberIDs, ); } if ( userLostMembership || !_isEqual(permissionsForChildren)(curPermissionsForChildren) ) { usersForNextLayer.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (usersForNextLayer.size > 0) { descendantsAsAncestors.push({ threadID, depth, changesByUser: usersForNextLayer, }); } } const nextDescendants = await fetchDescendantsForUpdate( descendantsAsAncestors, ); depthQueue.addInfos(nextDescendants); } return { membershipRows, relationshipChangeset }; } type DescendantUserInfo = $Shape<{ curRole?: string, curRolePermissions?: ?ThreadRolePermissionsBlob, curPermissions?: ?ThreadPermissionsBlob, curPermissionsForChildren?: ?ThreadPermissionsBlob, curPermissionsFromParent?: ?ThreadPermissionsBlob, curMemberOfContainingThread?: boolean, nextPermissionsFromParent?: ?ThreadPermissionsBlob, nextMemberOfContainingThread?: boolean, potentiallyNeedsUpdate?: boolean, }>; type DescendantInfo = { +threadID: string, +parentThreadID: string, +containingThreadID: string, +threadType: ThreadType, +depth: number, +users: Map, }; const fetchDescendantsBatchSize = 10; async function fetchDescendantsForUpdate( ancestors: $ReadOnlyArray, ): Promise { const threadIDs = ancestors.map(ancestor => ancestor.threadID); const rows = []; while (threadIDs.length > 0) { const batch = threadIDs.splice(0, fetchDescendantsBatchSize); const query = SQL` SELECT t.id, m.user, t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions AS role_permissions, m.permissions, m.permissions_for_children, m.role, pm.permissions_for_children AS permissions_from_parent, cm.role AS containing_role FROM threads t INNER JOIN memberships m ON m.thread = t.id LEFT JOIN memberships pm ON pm.thread = t.parent_thread_id AND pm.user = m.user LEFT JOIN memberships cm ON cm.thread = t.containing_thread_id AND cm.user = m.user LEFT JOIN roles r ON r.id = m.role WHERE t.parent_thread_id IN (${batch}) OR t.containing_thread_id IN (${batch}) `; const [results] = await dbQuery(query); pushAll(rows, results); } const descendantThreadInfos: Map = new Map(); for (const row of rows) { const descendantThreadID = row.id.toString(); if (!descendantThreadInfos.has(descendantThreadID)) { descendantThreadInfos.set(descendantThreadID, { threadID: descendantThreadID, parentThreadID: row.parent_thread_id.toString(), containingThreadID: row.containing_thread_id.toString(), threadType: assertThreadType(row.type), depth: row.depth, users: new Map(), }); } const descendantThreadInfo = descendantThreadInfos.get(descendantThreadID); invariant( descendantThreadInfo, `value should exist for key ${descendantThreadID}`, ); const userID = row.user.toString(); descendantThreadInfo.users.set(userID, { curRole: row.role.toString(), curRolePermissions: JSON.parse(row.role_permissions), curPermissions: JSON.parse(row.permissions), curPermissionsForChildren: JSON.parse(row.permissions_for_children), curPermissionsFromParent: JSON.parse(row.permissions_from_parent), curMemberOfContainingThread: row.containing_role > 0, }); } for (const ancestor of ancestors) { const { threadID, changesByUser } = ancestor; for (const [userID, changes] of changesByUser) { for (const descendantThreadInfo of descendantThreadInfos.values()) { const { users, parentThreadID, containingThreadID } = descendantThreadInfo; if (threadID !== parentThreadID && threadID !== containingThreadID) { continue; } let user = users.get(userID); if (!user) { user = {}; users.set(userID, user); } if (threadID === parentThreadID) { user.nextPermissionsFromParent = changes.permissionsForChildren; user.potentiallyNeedsUpdate = true; } if (threadID === containingThreadID) { user.nextMemberOfContainingThread = changes.userIsMember; if (!user.nextMemberOfContainingThread) { user.potentiallyNeedsUpdate = true; } } } } } return [...descendantThreadInfos.values()]; } function getDescendantDepth(descendant: DescendantInfo): number { return descendant.depth; } function getDescendantKey(descendant: DescendantInfo): string { return descendant.threadID; } function mergeDescendants( a: DescendantInfo, b: DescendantInfo, ): DescendantInfo { const { users: usersA, ...restA } = a; const { users: usersB, ...restB } = b; if (!_isEqual(restA)(restB)) { console.warn( `inconsistent descendantInfos ${JSON.stringify(restA)}, ` + JSON.stringify(restB), ); throw new ServerError('internal_error'); } const newUsers = new Map(usersA); for (const [userID, userFromB] of usersB) { const userFromA = newUsers.get(userID); if (!userFromA) { newUsers.set(userID, userFromB); } else { newUsers.set(userID, { ...userFromA, ...userFromB }); } } return { ...a, users: newUsers }; } type RecalculatePermissionsMemberInfo = { role?: ?string, permissions?: ?ThreadPermissionsBlob, permissionsForChildren?: ?ThreadPermissionsBlob, rolePermissions?: ?ThreadRolePermissionsBlob, memberOfContainingThread?: boolean, permissionsFromParent?: ?ThreadPermissionsBlob, }; async function recalculateThreadPermissions( threadID: string, ): Promise { const threadQuery = SQL` SELECT type, depth, parent_thread_id, containing_thread_id FROM threads WHERE id = ${threadID} `; const membershipQuery = SQL` SELECT m.user, m.role, m.permissions, m.permissions_for_children, r.permissions AS role_permissions, cm.role AS containing_role FROM threads t INNER JOIN memberships m ON m.thread = t.id LEFT JOIN roles r ON r.id = m.role LEFT JOIN memberships cm ON cm.user = m.user AND cm.thread = t.containing_thread_id WHERE t.id = ${threadID} `; const parentMembershipQuery = SQL` SELECT pm.user, pm.permissions_for_children AS permissions_from_parent FROM threads t INNER JOIN memberships pm ON pm.thread = t.parent_thread_id WHERE t.id = ${threadID} `; const [[threadResults], [membershipResults], [parentMembershipResults]] = await Promise.all([ dbQuery(threadQuery), dbQuery(membershipQuery), dbQuery(parentMembershipQuery), ]); if (threadResults.length !== 1) { throw new ServerError('internal_error'); } const [threadResult] = threadResults; const threadType = assertThreadType(threadResult.type); const depth = threadResult.depth; const hasContainingThreadID = threadResult.containing_thread_id !== null; const parentThreadID = threadResult.parent_thread_id?.toString(); const membershipInfo: Map = new Map(); for (const row of membershipResults) { const userID = row.user.toString(); membershipInfo.set(userID, { role: row.role.toString(), permissions: JSON.parse(row.permissions), permissionsForChildren: JSON.parse(row.permissions_for_children), rolePermissions: JSON.parse(row.role_permissions), memberOfContainingThread: !!( row.containing_role && row.containing_role > 0 ), }); } for (const row of parentMembershipResults) { const userID = row.user.toString(); const permissionsFromParent = JSON.parse(row.permissions_from_parent); const membership = membershipInfo.get(userID); if (membership) { membership.permissionsFromParent = permissionsFromParent; } else { membershipInfo.set(userID, { permissionsFromParent: permissionsFromParent, }); } } const relationshipChangeset = new RelationshipChangeset(); const existingMemberIDs = membershipResults.map(row => row.user.toString()); if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const parentMemberIDs = parentMembershipResults.map(row => row.user.toString(), ); if (parentThreadID && parentThreadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(parentMemberIDs); } const membershipRows = []; const toUpdateDescendants = new Map(); for (const [userID, membership] of membershipInfo) { const { rolePermissions: intendedRolePermissions, permissionsFromParent } = membership; const oldPermissions = membership?.permissions ?? null; const oldPermissionsForChildren = membership?.permissionsForChildren ?? null; const existingMembership = membership.role !== undefined; const oldRole = membership.role ?? '-1'; const memberOfContainingThread = hasContainingThreadID ? !!membership.memberOfContainingThread : true; const targetRole = memberOfContainingThread ? oldRole : '-1'; const rolePermissions = memberOfContainingThread ? intendedRolePermissions : null; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userLostMembership = Number(oldRole) > 0 && Number(newRole) <= 0; if (_isEqual(permissions)(oldPermissions) && oldRole === newRole) { // This thread and all of its descendants need no updates for this user, // since the corresponding memberships row is unchanged by this operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent: 'none', userID, threadID, userNeedsFullThreadDetails: false, permissions, permissionsForChildren, role: newRole, oldRole, }); } else { membershipRows.push({ operation: 'delete', intent: 'none', userID, threadID, oldRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { // If there was no membership row before, and we are creating one, // we'll need to make sure the new member has a relationship row with // each existing member. We handle guaranteeing that new members have // relationship rows with each other in saveMemberships. relationshipChangeset.setRelationshipsNeeded(userID, existingMemberIDs); } if ( userLostMembership || !_isEqual(permissionsForChildren)(oldPermissionsForChildren) ) { toUpdateDescendants.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (toUpdateDescendants.size > 0) { const { membershipRows: descendantMembershipRows, relationshipChangeset: descendantRelationshipChangeset, } = await updateDescendantPermissions({ threadID, depth, changesByUser: toUpdateDescendants, }); pushAll(membershipRows, descendantMembershipRows); relationshipChangeset.addAll(descendantRelationshipChangeset); } return { membershipRows, relationshipChangeset }; } const defaultSubscriptionString = JSON.stringify({ home: false, pushNotifs: false, }); const joinSubscriptionString = JSON.stringify({ home: true, pushNotifs: true }); const membershipInsertBatchSize = 50; async function saveMemberships(toSave: $ReadOnlyArray) { if (toSave.length === 0) { return; } const time = Date.now(); const insertRows = []; for (const rowToSave of toSave) { insertRows.push([ rowToSave.userID, rowToSave.threadID, rowToSave.role, time, rowToSave.intent === 'join' ? joinSubscriptionString : defaultSubscriptionString, rowToSave.permissions ? JSON.stringify(rowToSave.permissions) : null, rowToSave.permissionsForChildren ? JSON.stringify(rowToSave.permissionsForChildren) : null, rowToSave.unread ? 1 : 0, 0, ]); } // Logic below will only update an existing membership row's `subscription` // column if the user is either joining or leaving the thread. That means // there's no way to use this function to update a user's subscription without // also making them join or leave the thread. The reason we do this is because // we need to specify a value for `subscription` here, as it's a non-null // column and this is an INSERT, but we don't want to require people to have // to know the current `subscription` when they're just using this function to // update the permissions of an existing membership row. while (insertRows.length > 0) { const batch = insertRows.splice(0, membershipInsertBatchSize); const query = SQL` INSERT INTO memberships (user, thread, role, creation_time, subscription, permissions, permissions_for_children, last_message, last_read_message) VALUES ${batch} ON DUPLICATE KEY UPDATE subscription = IF( (role <= 0 AND VALUE(role) > 0) OR (role > 0 AND VALUE(role) <= 0), VALUE(subscription), subscription ), role = VALUE(role), permissions = VALUE(permissions), permissions_for_children = VALUE(permissions_for_children) `; await dbQuery(query); } } async function deleteMemberships( toDelete: $ReadOnlyArray, ) { if (toDelete.length === 0) { return; } const time = Date.now(); const insertRows = toDelete.map(rowToDelete => [ rowToDelete.userID, rowToDelete.threadID, -1, time, defaultSubscriptionString, null, null, 0, 0, ]); while (insertRows.length > 0) { const batch = insertRows.splice(0, membershipInsertBatchSize); const query = SQL` INSERT INTO memberships (user, thread, role, creation_time, subscription, permissions, permissions_for_children, last_message, last_read_message) VALUES ${batch} ON DUPLICATE KEY UPDATE role = -1, permissions = NULL, permissions_for_children = NULL, subscription = ${defaultSubscriptionString}, last_message = 0, last_read_message = 0 `; await dbQuery(query); } } const emptyCommitMembershipChangesetConfig = Object.freeze({}); // Specify non-empty changedThreadIDs to force updates to be generated for those // threads, presumably for reasons not covered in the changeset. calendarQuery // only needs to be specified if a JOIN_THREAD update will be generated for the // viewer, in which case it's necessary for knowing the set of entries to fetch. type ChangesetCommitResult = { ...CreateUpdatesResult, }; async function commitMembershipChangeset( viewer: Viewer, changeset: Changeset, { changedThreadIDs = new Set(), calendarQuery, updatesForCurrentSession = 'return', }: { +changedThreadIDs?: Set, +calendarQuery?: ?CalendarQuery, +updatesForCurrentSession?: UpdatesForCurrentSession, } = emptyCommitMembershipChangesetConfig, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const { membershipRows, relationshipChangeset } = changeset; const membershipRowMap = new Map(); for (const row of membershipRows) { const { userID, threadID } = row; changedThreadIDs.add(threadID); const pairString = `${userID}|${threadID}`; const existing = membershipRowMap.get(pairString); invariant( !existing || existing.intent === 'none' || row.intent === 'none', `multiple intents provided for ${pairString}`, ); if (!existing || existing.intent === 'none') { membershipRowMap.set(pairString, row); } } const toSave = [], toDelete = [], toRescindPushNotifs = []; for (const row of membershipRowMap.values()) { if ( row.operation === 'delete' || (row.operation === 'save' && Number(row.role) <= 0) ) { const { userID, threadID } = row; toRescindPushNotifs.push({ userID, threadID }); } if (row.operation === 'delete') { toDelete.push(row); } else { toSave.push(row); } } const threadsToSavedUsers = new Map(); for (const row of membershipRowMap.values()) { const { userID, threadID } = row; let savedUsers = threadsToSavedUsers.get(threadID); if (!savedUsers) { savedUsers = []; threadsToSavedUsers.set(threadID, savedUsers); } savedUsers.push(userID); } for (const [threadID, savedUsers] of threadsToSavedUsers) { if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsNeeded(savedUsers); } } const relationshipRows = relationshipChangeset.getRows(); const [updateDatas] = await Promise.all([ updateChangedUndirectedRelationships(relationshipRows), saveMemberships(toSave), deleteMemberships(toDelete), rescindPushNotifsForMemberDeletion(toRescindPushNotifs), ]); - const serverThreadInfoFetchResult = await fetchServerThreadInfos( - SQL`t.id IN (${[...changedThreadIDs]})`, - ); + const serverThreadInfoFetchResult = await fetchServerThreadInfos({ + threadIDs: changedThreadIDs, + }); const { threadInfos: serverThreadInfos } = serverThreadInfoFetchResult; const time = Date.now(); for (const changedThreadID of changedThreadIDs) { const serverThreadInfo = serverThreadInfos[changedThreadID]; for (const memberInfo of serverThreadInfo.members) { const pairString = `${memberInfo.id}|${serverThreadInfo.id}`; const membershipRow = membershipRowMap.get(pairString); if (membershipRow) { continue; } updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID: memberInfo.id, time, threadID: changedThreadID, }); } } for (const row of membershipRowMap.values()) { const { userID, threadID } = row; if (row.operation === 'delete' || row.role === '-1') { if (row.oldRole !== '-1') { updateDatas.push({ type: updateTypes.DELETE_THREAD, userID, time, threadID, }); } } else if (row.userNeedsFullThreadDetails) { updateDatas.push({ type: updateTypes.JOIN_THREAD, userID, time, threadID, }); } else { updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID, time, threadID, }); } } const threadInfoFetchResult = rawThreadInfosFromServerThreadInfos( viewer, serverThreadInfoFetchResult, ); const { viewerUpdates, userInfos } = await createUpdates(updateDatas, { viewer, calendarQuery, ...threadInfoFetchResult, updatesForCurrentSession, }); return { userInfos, viewerUpdates, }; } const emptyGetChangesetCommitResultConfig = Object.freeze({}); // When the user tries to create a new thread, it's possible for the client to // fail the creation even if a row gets added to the threads table. This may // occur due to a timeout (on either the client or server side), or due to some // error in the server code following the INSERT operation. Handling the error // scenario is more challenging since it would require detecting which set of // operations failed so we could retry them. As a result, this code is geared at // only handling the timeout scenario. async function getChangesetCommitResultForExistingThread( viewer: Viewer, threadID: string, otherUpdates: $ReadOnlyArray, { calendarQuery, updatesForCurrentSession = 'return', }: { +calendarQuery?: ?CalendarQuery, +updatesForCurrentSession?: UpdatesForCurrentSession, } = emptyGetChangesetCommitResultConfig, ): Promise { for (const update of otherUpdates) { if ( update.type === updateTypes.JOIN_THREAD && update.threadInfo.id === threadID ) { // If the JOIN_THREAD is already there we can expect // the appropriate UPDATE_USERs to be covered as well return { viewerUpdates: otherUpdates, userInfos: {} }; } } const time = Date.now(); const updateDatas = [ { type: updateTypes.JOIN_THREAD, userID: viewer.userID, time, threadID, targetSession: viewer.session, }, ]; // To figure out what UserInfos might be missing, we consider the worst case: // the same client previously attempted to create a thread with a non-friend // they found via search results, but the request timed out. In this scenario // the viewer might never have received the UPDATE_USER that would add that // UserInfo to their UserStore, but the server assumed the client had gotten // it because createUpdates was called with UpdatesForCurrentSession=return. // For completeness here we query for the full list of memberships rows in the // thread. We can't use fetchServerThreadInfos because it skips role=-1 rows const membershipsQuery = SQL` SELECT user FROM memberships WHERE thread = ${threadID} AND user != ${viewer.userID} `; const [results] = await dbQuery(membershipsQuery); for (const row of results) { updateDatas.push({ type: updateTypes.UPDATE_USER, userID: viewer.userID, time, updatedUserID: row.user.toString(), targetSession: viewer.session, }); } const { viewerUpdates, userInfos } = await createUpdates(updateDatas, { viewer, calendarQuery, updatesForCurrentSession, }); return { viewerUpdates: [...otherUpdates, ...viewerUpdates], userInfos }; } const rescindPushNotifsBatchSize = 3; async function rescindPushNotifsForMemberDeletion( toRescindPushNotifs: $ReadOnlyArray<{ +userID: string, +threadID: string }>, ): Promise { const queue = [...toRescindPushNotifs]; while (queue.length > 0) { const batch = queue.splice(0, rescindPushNotifsBatchSize); await Promise.all( batch.map(({ userID, threadID }) => rescindPushNotifs( SQL`n.thread = ${threadID} AND n.user = ${userID}`, SQL`IF(m.thread = ${threadID}, NULL, m.thread)`, ), ), ); } } // Deprecated - use updateRolesAndPermissionsForAllThreads instead async function DEPRECATED_recalculateAllThreadPermissions() { const getAllThreads = SQL`SELECT id FROM threads`; const [result] = await dbQuery(getAllThreads); // We handle each thread one-by-one to avoid a situation where a permission // calculation for a child thread, done during a call to // recalculateThreadPermissions for the parent thread, can be incorrectly // overriden by a call to recalculateThreadPermissions for the child thread. // If the changeset resulting from the parent call isn't committed before the // calculation is done for the child, the calculation done for the child can // be incorrect. const viewer = createScriptViewer(bots.commbot.userID); for (const row of result) { const threadID = row.id.toString(); const changeset = await recalculateThreadPermissions(threadID); await commitMembershipChangeset(viewer, changeset); } } async function updateRolesAndPermissionsForAllThreads() { const batchSize = 10; const fetchThreads = SQL`SELECT id, type, depth FROM threads`; const [result] = await dbQuery(fetchThreads); const allThreads = result.map(row => { return { id: row.id.toString(), type: assertThreadType(row.type), depth: row.depth, }; }); const viewer = createScriptViewer(bots.commbot.userID); const maxDepth = Math.max(...allThreads.map(row => row.depth)); for (let depth = 0; depth <= maxDepth; depth++) { const threads = allThreads.filter(row => row.depth === depth); console.log(`recalculating permissions for threads with depth ${depth}`); while (threads.length > 0) { const batch = threads.splice(0, batchSize); const membershipRows = []; const relationshipChangeset = new RelationshipChangeset(); await Promise.all( batch.map(async thread => { console.log(`updating roles for ${thread.id}`); await updateRoles(viewer, thread.id, thread.type); console.log(`recalculating permissions for ${thread.id}`); const { membershipRows: threadMembershipRows, relationshipChangeset: threadRelationshipChangeset, } = await recalculateThreadPermissions(thread.id); membershipRows.push(...threadMembershipRows); relationshipChangeset.addAll(threadRelationshipChangeset); }), ); console.log(`committing batch ${JSON.stringify(batch)}`); await commitMembershipChangeset(viewer, { membershipRows, relationshipChangeset, }); } } } export { changeRole, recalculateThreadPermissions, getChangesetCommitResultForExistingThread, saveMemberships, commitMembershipChangeset, DEPRECATED_recalculateAllThreadPermissions, updateRolesAndPermissionsForAllThreads, }; diff --git a/keyserver/src/updaters/thread-updaters.js b/keyserver/src/updaters/thread-updaters.js index 9f1e130c9..30fd6a046 100644 --- a/keyserver/src/updaters/thread-updaters.js +++ b/keyserver/src/updaters/thread-updaters.js @@ -1,930 +1,930 @@ // @flow import { getRolePermissionBlobs } from 'lib/permissions/thread-permissions.js'; import { filteredThreadIDs } from 'lib/selectors/calendar-filter-selectors.js'; import { getPinnedContentFromMessage } from 'lib/shared/message-utils.js'; import { threadHasAdminRole, roleIsAdminRole, viewerIsMember, getThreadTypeParentRequirement, validChatNameRegex, } from 'lib/shared/thread-utils.js'; import type { Shape } from 'lib/types/core.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import { threadPermissions } from 'lib/types/thread-permission-types.js'; import { threadTypes } from 'lib/types/thread-types-enum.js'; import { type RoleChangeRequest, type ChangeThreadSettingsResult, type RemoveMembersRequest, type LeaveThreadRequest, type LeaveThreadResult, type UpdateThreadRequest, type ServerThreadJoinRequest, type ThreadJoinResult, type ToggleMessagePinRequest, type ToggleMessagePinResult, } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { ServerError } from 'lib/utils/errors.js'; import { promiseAll } from 'lib/utils/promises.js'; import { firstLine } from 'lib/utils/string-utils.js'; import { reportLinkUsage } from './link-updaters.js'; import { updateRoles } from './role-updaters.js'; import { changeRole, recalculateThreadPermissions, commitMembershipChangeset, } from './thread-permission-updaters.js'; import createMessages from '../creators/message-creator.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { checkIfInviteLinkIsValid } from '../fetchers/link-fetchers.js'; import { fetchMessageInfoByID } from '../fetchers/message-fetchers.js'; import { fetchThreadInfos, fetchServerThreadInfos, determineThreadAncestry, } from '../fetchers/thread-fetchers.js'; import { checkThreadPermission, viewerIsMember as fetchViewerIsMember, checkThread, validateCandidateMembers, } from '../fetchers/thread-permission-fetchers.js'; import { verifyUserIDs, verifyUserOrCookieIDs, } from '../fetchers/user-fetchers.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import type { Viewer } from '../session/viewer.js'; import RelationshipChangeset from '../utils/relationship-changeset.js'; async function updateRole( viewer: Viewer, request: RoleChangeRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [memberIDs, hasPermission, fetchThreadResult] = await Promise.all([ verifyUserIDs(request.memberIDs), checkThreadPermission( viewer, request.threadID, threadPermissions.CHANGE_ROLE, ), - fetchThreadInfos(viewer, SQL`t.id = ${request.threadID}`), + fetchThreadInfos(viewer, { threadID: request.threadID }), ]); if (memberIDs.length === 0) { throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_credentials'); } const threadInfo = fetchThreadResult.threadInfos[request.threadID]; if (!threadInfo) { throw new ServerError('invalid_parameters'); } const adminRoleID = Object.keys(threadInfo.roles).find( roleID => threadInfo.roles[roleID].name === 'Admins', ); // Ensure that there will always still be at least one admin in a community if (adminRoleID) { const memberRoles = memberIDs.map( memberID => threadInfo.members.find(member => member.id === memberID)?.role, ); const communityAdminsCount = threadInfo.members.filter( member => member.role === adminRoleID, ).length; const changedAdminsCount = memberRoles.filter( memberRole => memberRole === adminRoleID, ).length; if (changedAdminsCount >= communityAdminsCount) { throw new ServerError('invalid_parameters'); } } const query = SQL` SELECT user, role FROM memberships WHERE user IN (${memberIDs}) AND thread = ${request.threadID} `; const [result] = await dbQuery(query); let nonMemberUser = false; let numResults = 0; for (const row of result) { if (row.role <= 0) { nonMemberUser = true; break; } numResults++; } if (nonMemberUser || numResults < memberIDs.length) { throw new ServerError('invalid_parameters'); } const changeset = await changeRole(request.threadID, memberIDs, request.role); const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset); const messageData = { type: messageTypes.CHANGE_ROLE, threadID: request.threadID, creatorID: viewer.userID, time: Date.now(), userIDs: memberIDs, newRole: request.role, }; const newMessageInfos = await createMessages(viewer, [messageData]); return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } async function removeMembers( viewer: Viewer, request: RemoveMembersRequest, ): Promise { const viewerID = viewer.userID; if (request.memberIDs.includes(viewerID)) { throw new ServerError('invalid_parameters'); } const [memberIDs, hasPermission] = await Promise.all([ verifyUserOrCookieIDs(request.memberIDs), checkThreadPermission( viewer, request.threadID, threadPermissions.REMOVE_MEMBERS, ), ]); if (memberIDs.length === 0) { throw new ServerError('invalid_parameters'); } if (!hasPermission) { throw new ServerError('invalid_credentials'); } const query = SQL` SELECT m.user, m.role, t.default_role FROM memberships m LEFT JOIN threads t ON t.id = m.thread WHERE m.user IN (${memberIDs}) AND m.thread = ${request.threadID} `; const [result] = await dbQuery(query); let nonDefaultRoleUser = false; const actualMemberIDs = []; for (const row of result) { if (row.role <= 0) { continue; } actualMemberIDs.push(row.user.toString()); if (row.role !== row.default_role) { nonDefaultRoleUser = true; } } if (nonDefaultRoleUser) { const hasChangeRolePermission = await checkThreadPermission( viewer, request.threadID, threadPermissions.CHANGE_ROLE, ); if (!hasChangeRolePermission) { throw new ServerError('invalid_credentials'); } } const changeset = await changeRole(request.threadID, actualMemberIDs, 0); const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset); const newMessageInfos = await (async () => { if (actualMemberIDs.length === 0) { return []; } const messageData = { type: messageTypes.REMOVE_MEMBERS, threadID: request.threadID, creatorID: viewerID, time: Date.now(), removedUserIDs: actualMemberIDs, }; return await createMessages(viewer, [messageData]); })(); return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } async function leaveThread( viewer: Viewer, request: LeaveThreadRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const [fetchThreadResult, hasPermission] = await Promise.all([ - fetchThreadInfos(viewer, SQL`t.id = ${request.threadID}`), + fetchThreadInfos(viewer, { threadID: request.threadID }), checkThreadPermission( viewer, request.threadID, threadPermissions.LEAVE_THREAD, ), ]); const threadInfo = fetchThreadResult.threadInfos[request.threadID]; if (!viewerIsMember(threadInfo)) { return { updatesResult: { newUpdates: [] }, }; } if (!hasPermission) { throw new ServerError('invalid_parameters'); } const viewerID = viewer.userID; if (threadHasAdminRole(threadInfo)) { let otherUsersExist = false; let otherAdminsExist = false; for (const member of threadInfo.members) { const role = member.role; if (!role || member.id === viewerID) { continue; } otherUsersExist = true; if (roleIsAdminRole(threadInfo.roles[role])) { otherAdminsExist = true; break; } } if (otherUsersExist && !otherAdminsExist) { throw new ServerError('invalid_parameters'); } } const changeset = await changeRole(request.threadID, [viewerID], 0); const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset); const messageData = { type: messageTypes.LEAVE_THREAD, threadID: request.threadID, creatorID: viewerID, time: Date.now(), }; await createMessages(viewer, [messageData]); return { updatesResult: { newUpdates: viewerUpdates } }; } type UpdateThreadOptions = Shape<{ +forceAddMembers: boolean, +forceUpdateRoot: boolean, +silenceMessages: boolean, +ignorePermissions: boolean, }>; async function updateThread( viewer: Viewer, request: UpdateThreadRequest, options?: UpdateThreadOptions, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const forceAddMembers = options?.forceAddMembers ?? false; const forceUpdateRoot = options?.forceUpdateRoot ?? false; const silenceMessages = options?.silenceMessages ?? false; const ignorePermissions = (options?.ignorePermissions && viewer.isScriptViewer) ?? false; const validationPromises = {}; const changedFields = {}; const sqlUpdate = {}; const untrimmedName = request.changes.name; if (untrimmedName !== undefined && untrimmedName !== null) { const name = firstLine(untrimmedName); if (name.search(validChatNameRegex) === -1) { throw new ServerError('invalid_chat_name'); } changedFields.name = name; sqlUpdate.name = name ?? null; } const { description } = request.changes; if (description !== undefined && description !== null) { changedFields.description = description; sqlUpdate.description = description ?? null; } if (request.changes.color) { const color = request.changes.color.toLowerCase(); changedFields.color = color; sqlUpdate.color = color; } const { parentThreadID } = request.changes; if (parentThreadID !== undefined) { // TODO some sort of message when this changes sqlUpdate.parent_thread_id = parentThreadID; } const { avatar } = request.changes; if (avatar) { changedFields.avatar = avatar.type !== 'remove' ? JSON.stringify(avatar) : ''; sqlUpdate.avatar = avatar.type !== 'remove' ? JSON.stringify(avatar) : null; } const threadType = request.changes.type; if (threadType !== null && threadType !== undefined) { changedFields.type = threadType; sqlUpdate.type = threadType; } if ( !ignorePermissions && threadType !== null && threadType !== undefined && threadType !== threadTypes.COMMUNITY_OPEN_SUBTHREAD && threadType !== threadTypes.COMMUNITY_SECRET_SUBTHREAD ) { throw new ServerError('invalid_parameters'); } const newMemberIDs = request.changes.newMemberIDs && request.changes.newMemberIDs.length > 0 ? [...new Set(request.changes.newMemberIDs)] : null; if ( Object.keys(sqlUpdate).length === 0 && !newMemberIDs && !forceUpdateRoot ) { throw new ServerError('invalid_parameters'); } - validationPromises.serverThreadInfos = fetchServerThreadInfos( - SQL`t.id = ${request.threadID}`, - ); + validationPromises.serverThreadInfos = fetchServerThreadInfos({ + threadID: request.threadID, + }); validationPromises.hasNecessaryPermissions = (async () => { if (ignorePermissions) { return; } const checks = []; if (sqlUpdate.name !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_NAME, }); } if (sqlUpdate.description !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_DESCRIPTION, }); } if (sqlUpdate.color !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_COLOR, }); } if (sqlUpdate.avatar !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_THREAD_AVATAR, }); } if (parentThreadID !== undefined || sqlUpdate.type !== undefined) { checks.push({ check: 'permission', permission: threadPermissions.EDIT_PERMISSIONS, }); } if (newMemberIDs) { checks.push({ check: 'permission', permission: threadPermissions.ADD_MEMBERS, }); } const hasNecessaryPermissions = await checkThread( viewer, request.threadID, checks, ); if (!hasNecessaryPermissions) { throw new ServerError('invalid_credentials'); } })(); const { serverThreadInfos } = await promiseAll(validationPromises); const serverThreadInfo = serverThreadInfos.threadInfos[request.threadID]; if (!serverThreadInfo) { throw new ServerError('internal_error'); } // Threads with source message should be visible to everyone, but we can't // guarantee it for COMMUNITY_SECRET_SUBTHREAD threads so we forbid it for // now. In the future, if we want to support this, we would need to unlink the // source message. if ( threadType !== null && threadType !== undefined && threadType !== threadTypes.SIDEBAR && threadType !== threadTypes.COMMUNITY_OPEN_SUBTHREAD && serverThreadInfo.sourceMessageID ) { throw new ServerError('invalid_parameters'); } // You can't change the parent thread of a current or former SIDEBAR if (parentThreadID !== undefined && serverThreadInfo.sourceMessageID) { throw new ServerError('invalid_parameters'); } const oldThreadType = serverThreadInfo.type; const oldParentThreadID = serverThreadInfo.parentThreadID; const oldContainingThreadID = serverThreadInfo.containingThreadID; const oldCommunity = serverThreadInfo.community; const oldDepth = serverThreadInfo.depth; const nextThreadType = threadType !== null && threadType !== undefined ? threadType : oldThreadType; let nextParentThreadID = parentThreadID !== undefined ? parentThreadID : oldParentThreadID; // Does the new thread type preclude a parent? if ( threadType !== undefined && threadType !== null && getThreadTypeParentRequirement(threadType) === 'disabled' && nextParentThreadID !== null ) { nextParentThreadID = null; sqlUpdate.parent_thread_id = null; } // Does the new thread type require a parent? if ( threadType !== undefined && threadType !== null && getThreadTypeParentRequirement(threadType) === 'required' && nextParentThreadID === null ) { throw new ServerError('no_parent_thread_specified'); } const determineThreadAncestryPromise = determineThreadAncestry( nextParentThreadID, nextThreadType, ); const confirmParentPermissionPromise = (async () => { if (ignorePermissions || !nextParentThreadID) { return; } if ( nextParentThreadID === oldParentThreadID && (nextThreadType === threadTypes.SIDEBAR) === (oldThreadType === threadTypes.SIDEBAR) ) { return; } const hasParentPermission = await checkThreadPermission( viewer, nextParentThreadID, nextThreadType === threadTypes.SIDEBAR ? threadPermissions.CREATE_SIDEBARS : threadPermissions.CREATE_SUBCHANNELS, ); if (!hasParentPermission) { throw new ServerError('invalid_parameters'); } })(); const rolesNeedUpdate = forceUpdateRoot || nextThreadType !== oldThreadType; const validateNewMembersPromise = (async () => { if (!newMemberIDs || ignorePermissions) { return; } const defaultRolePermissionsPromise = (async () => { let rolePermissions; if (!rolesNeedUpdate) { const rolePermissionsQuery = SQL` SELECT r.permissions FROM threads t LEFT JOIN roles r ON r.id = t.default_role WHERE t.id = ${request.threadID} `; const [result] = await dbQuery(rolePermissionsQuery); if (result.length > 0) { rolePermissions = JSON.parse(result[0].permissions); } } if (!rolePermissions) { rolePermissions = getRolePermissionBlobs(nextThreadType).Members; } return rolePermissions; })(); const [defaultRolePermissions, nextThreadAncestry] = await Promise.all([ defaultRolePermissionsPromise, determineThreadAncestryPromise, ]); const { newMemberIDs: validatedIDs } = await validateCandidateMembers( viewer, { newMemberIDs }, { threadType: nextThreadType, parentThreadID: nextParentThreadID, containingThreadID: nextThreadAncestry.containingThreadID, defaultRolePermissions, }, { requireRelationship: !forceAddMembers }, ); if ( validatedIDs && Number(validatedIDs?.length) < Number(newMemberIDs?.length) ) { throw new ServerError('invalid_credentials'); } })(); const { nextThreadAncestry } = await promiseAll({ nextThreadAncestry: determineThreadAncestryPromise, confirmParentPermissionPromise, validateNewMembersPromise, }); if (nextThreadAncestry.containingThreadID !== oldContainingThreadID) { sqlUpdate.containing_thread_id = nextThreadAncestry.containingThreadID; } if (nextThreadAncestry.community !== oldCommunity) { if (!ignorePermissions) { throw new ServerError('invalid_parameters'); } sqlUpdate.community = nextThreadAncestry.community; } if (nextThreadAncestry.depth !== oldDepth) { sqlUpdate.depth = nextThreadAncestry.depth; } const updateQueryPromise = (async () => { if (Object.keys(sqlUpdate).length === 0) { return; } const { avatar: avatarUpdate, ...nonAvatarUpdates } = sqlUpdate; const updatePromises = []; if (Object.keys(nonAvatarUpdates).length > 0) { const nonAvatarUpdateQuery = SQL` UPDATE threads SET ${nonAvatarUpdates} WHERE id = ${request.threadID} `; updatePromises.push(dbQuery(nonAvatarUpdateQuery)); } if (avatarUpdate !== undefined) { const avatarUploadID = avatar && avatar.type === 'image' ? avatar.uploadID : null; const avatarUpdateQuery = SQL` START TRANSACTION; UPDATE uploads SET container = NULL WHERE container = ${request.threadID} AND ( ${avatarUploadID} IS NULL OR EXISTS ( SELECT 1 FROM uploads WHERE id = ${avatarUploadID} AND ${avatarUploadID} IS NOT NULL AND uploader = ${viewer.userID} AND container IS NULL AND thread IS NULL ) ); UPDATE uploads SET container = ${request.threadID} WHERE id = ${avatarUploadID} AND ${avatarUploadID} IS NOT NULL AND uploader = ${viewer.userID} AND container IS NULL AND thread IS NULL; UPDATE threads SET avatar = ${avatarUpdate} WHERE id = ${request.threadID} AND ( ${avatarUploadID} IS NULL OR EXISTS ( SELECT 1 FROM uploads WHERE id = ${avatarUploadID} AND ${avatarUploadID} IS NOT NULL AND uploader = ${viewer.userID} AND container = ${request.threadID} AND thread IS NULL ) ); COMMIT; `; updatePromises.push( dbQuery(avatarUpdateQuery, { multipleStatements: true }), ); } await Promise.all(updatePromises); })(); const updateRolesPromise = (async () => { if (rolesNeedUpdate) { await updateRoles(viewer, request.threadID, nextThreadType); } })(); const intermediatePromises = {}; intermediatePromises.updateQuery = updateQueryPromise; intermediatePromises.updateRoles = updateRolesPromise; if (newMemberIDs) { intermediatePromises.addMembersChangeset = (async () => { await Promise.all([updateQueryPromise, updateRolesPromise]); return await changeRole(request.threadID, newMemberIDs, null, { setNewMembersToUnread: true, }); })(); } const threadRootChanged = rolesNeedUpdate || nextParentThreadID !== oldParentThreadID; if (threadRootChanged) { intermediatePromises.recalculatePermissionsChangeset = (async () => { await Promise.all([updateQueryPromise, updateRolesPromise]); return await recalculateThreadPermissions(request.threadID); })(); } const { addMembersChangeset, recalculatePermissionsChangeset } = await promiseAll(intermediatePromises); const membershipRows = []; const relationshipChangeset = new RelationshipChangeset(); if (recalculatePermissionsChangeset) { const { membershipRows: recalculateMembershipRows, relationshipChangeset: recalculateRelationshipChangeset, } = recalculatePermissionsChangeset; membershipRows.push(...recalculateMembershipRows); relationshipChangeset.addAll(recalculateRelationshipChangeset); } let addedMemberIDs; if (addMembersChangeset) { const { membershipRows: addMembersMembershipRows, relationshipChangeset: addMembersRelationshipChangeset, } = addMembersChangeset; addedMemberIDs = addMembersMembershipRows .filter( row => row.operation === 'save' && row.threadID === request.threadID && Number(row.role) > 0, ) .map(row => row.userID); membershipRows.push(...addMembersMembershipRows); relationshipChangeset.addAll(addMembersRelationshipChangeset); } const changeset = { membershipRows, relationshipChangeset }; const { viewerUpdates } = await commitMembershipChangeset(viewer, changeset, { // This forces an update for this thread, // regardless of whether any membership rows are changed changedThreadIDs: Object.keys(sqlUpdate).length > 0 ? new Set([request.threadID]) : new Set(), }); let newMessageInfos = []; if (!silenceMessages) { const time = Date.now(); const messageDatas = []; for (const fieldName in changedFields) { const newValue = changedFields[fieldName]; messageDatas.push({ type: messageTypes.CHANGE_SETTINGS, threadID: request.threadID, creatorID: viewer.userID, time, field: fieldName, value: newValue, }); } if (addedMemberIDs && addedMemberIDs.length > 0) { messageDatas.push({ type: messageTypes.ADD_MEMBERS, threadID: request.threadID, creatorID: viewer.userID, time, addedUserIDs: addedMemberIDs, }); } newMessageInfos = await createMessages(viewer, messageDatas); } return { updatesResult: { newUpdates: viewerUpdates }, newMessageInfos }; } async function joinThread( viewer: Viewer, request: ServerThreadJoinRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const permissionCheck = request.inviteLinkSecret ? checkIfInviteLinkIsValid(request.inviteLinkSecret, request.threadID) : checkThreadPermission( viewer, request.threadID, threadPermissions.JOIN_THREAD, ); const [isMember, hasPermission] = await Promise.all([ fetchViewerIsMember(viewer, request.threadID), permissionCheck, ]); if (!hasPermission) { throw new ServerError('invalid_parameters'); } const { calendarQuery } = request; if (isMember) { const response: ThreadJoinResult = { rawMessageInfos: [], truncationStatuses: {}, userInfos: {}, updatesResult: { newUpdates: [], }, }; return response; } if (calendarQuery) { const threadFilterIDs = filteredThreadIDs(calendarQuery.filters); if ( !threadFilterIDs || threadFilterIDs.size !== 1 || threadFilterIDs.values().next().value !== request.threadID ) { throw new ServerError('invalid_parameters'); } } const changeset = await changeRole(request.threadID, [viewer.userID], null); const membershipResult = await commitMembershipChangeset(viewer, changeset, { calendarQuery, }); if (request.inviteLinkSecret) { handleAsyncPromise(reportLinkUsage(request.inviteLinkSecret)); } const messageData = { type: messageTypes.JOIN_THREAD, threadID: request.threadID, creatorID: viewer.userID, time: Date.now(), }; const newMessages = await createMessages(viewer, [messageData]); return { rawMessageInfos: newMessages, truncationStatuses: {}, userInfos: membershipResult.userInfos, updatesResult: { newUpdates: membershipResult.viewerUpdates, }, }; } async function toggleMessagePinForThread( viewer: Viewer, request: ToggleMessagePinRequest, ): Promise { const { messageID, action } = request; const targetMessage = await fetchMessageInfoByID(viewer, messageID); if (!targetMessage) { throw new ServerError('invalid_parameters'); } const { threadID } = targetMessage; const hasPermission = await checkThreadPermission( viewer, threadID, threadPermissions.MANAGE_PINS, ); if (!hasPermission) { throw new ServerError('invalid_credentials'); } const pinnedValue = action === 'pin' ? 1 : 0; const pinTimeValue = action === 'pin' ? Date.now() : null; const togglePinQuery = SQL` UPDATE messages SET pinned = ${pinnedValue}, pin_time = ${pinTimeValue} WHERE id = ${messageID} AND thread = ${threadID} `; const messageData = { type: messageTypes.TOGGLE_PIN, threadID, targetMessageID: messageID, action, pinnedContent: getPinnedContentFromMessage(targetMessage), creatorID: viewer.userID, time: Date.now(), }; let updateThreadQuery; if (action === 'pin') { updateThreadQuery = SQL` UPDATE threads SET pinned_count = pinned_count + 1 WHERE id = ${threadID} `; } else { updateThreadQuery = SQL` UPDATE threads SET pinned_count = pinned_count - 1 WHERE id = ${threadID} `; } const [{ threadInfos: serverThreadInfos }] = await Promise.all([ - fetchServerThreadInfos(SQL`t.id = ${threadID}`), + fetchServerThreadInfos({ threadID }), dbQuery(togglePinQuery), dbQuery(updateThreadQuery), ]); const newMessageInfos = await createMessages(viewer, [messageData]); const time = Date.now(); const updates = []; for (const member of serverThreadInfos[threadID].members) { updates.push({ userID: member.id, time, threadID, type: updateTypes.UPDATE_THREAD, }); } await createUpdates(updates); return { newMessageInfos, threadID, }; } export { updateRole, removeMembers, leaveThread, updateThread, joinThread, toggleMessagePinForThread, }; diff --git a/keyserver/src/updaters/user-subscription-updaters.js b/keyserver/src/updaters/user-subscription-updaters.js index ccb0a8350..3be9186d4 100644 --- a/keyserver/src/updaters/user-subscription-updaters.js +++ b/keyserver/src/updaters/user-subscription-updaters.js @@ -1,63 +1,62 @@ // @flow import { viewerIsMember } from 'lib/shared/thread-utils.js'; import type { ThreadSubscription, SubscriptionUpdateRequest, } from 'lib/types/subscription-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { ServerError } from 'lib/utils/errors.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import type { Viewer } from '../session/viewer.js'; async function userSubscriptionUpdater( viewer: Viewer, update: SubscriptionUpdateRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } - const { threadInfos } = await fetchThreadInfos( - viewer, - SQL`t.id = ${update.threadID}`, - ); + const { threadInfos } = await fetchThreadInfos(viewer, { + threadID: update.threadID, + }); const threadInfo = threadInfos[update.threadID]; if (!viewerIsMember(threadInfo)) { throw new ServerError('not_member'); } const promises = []; const newSubscription = { ...threadInfo.currentUser.subscription, ...update.updatedFields, }; const saveQuery = SQL` UPDATE memberships SET subscription = ${JSON.stringify(newSubscription)} WHERE user = ${viewer.userID} AND thread = ${update.threadID} `; promises.push(dbQuery(saveQuery)); const time = Date.now(); const updateDatas = [ { type: updateTypes.UPDATE_THREAD, userID: viewer.userID, time, threadID: update.threadID, }, ]; promises.push( createUpdates(updateDatas, { viewer, updatesForCurrentSession: 'ignore' }), ); await Promise.all(promises); return newSubscription; } export { userSubscriptionUpdater };