diff --git a/keyserver/src/creators/session-creator.js b/keyserver/src/creators/session-creator.js index 040faacea..2166153b4 100644 --- a/keyserver/src/creators/session-creator.js +++ b/keyserver/src/creators/session-creator.js @@ -1,40 +1,53 @@ // @flow import type { CalendarQuery } from 'lib/types/entry-types'; import { dbQuery, SQL } from '../database/database'; +import { getDBType } from '../database/db-config'; import type { Viewer } from '../session/viewer'; async function createSession( viewer: Viewer, calendarQuery: CalendarQuery, initialLastUpdate: number, ): Promise { const time = Date.now(); const row = [ viewer.session, viewer.userID, viewer.cookieID, JSON.stringify(calendarQuery), time, initialLastUpdate, time, ]; const query = SQL` INSERT INTO sessions (id, user, cookie, query, creation_time, last_update, last_validated) VALUES ${[row]} + `; + const dbType = await getDBType(); + if (dbType === 'mysql5.7') { + query.append(SQL` ON DUPLICATE KEY UPDATE query = VALUES(query), last_update = VALUES(last_update), last_validated = VALUES(last_validated) - `; + `); + } else { + query.append(SQL` + ON DUPLICATE KEY UPDATE + query = VALUE(query), + last_update = VALUE(last_update), + last_validated = VALUE(last_validated) + `); + } await dbQuery(query); viewer.setSessionInfo({ lastValidated: time, lastUpdate: initialLastUpdate, calendarQuery, }); } export { createSession }; diff --git a/keyserver/src/updaters/account-updaters.js b/keyserver/src/updaters/account-updaters.js index 0ec0eacb1..08cdf5403 100644 --- a/keyserver/src/updaters/account-updaters.js +++ b/keyserver/src/updaters/account-updaters.js @@ -1,112 +1,122 @@ // @flow import bcrypt from 'twin-bcrypt'; import type { ResetPasswordRequest, UpdatePasswordRequest, UpdateUserSettingsRequest, LogInResponse, } from 'lib/types/account-types'; import { updateTypes } from 'lib/types/update-types'; import type { PasswordUpdate } from 'lib/types/user-types'; import { ServerError } from 'lib/utils/errors'; import { createUpdates } from '../creators/update-creator'; import { dbQuery, SQL } from '../database/database'; +import { getDBType } from '../database/db-config'; import type { Viewer } from '../session/viewer'; async function accountUpdater( viewer: Viewer, update: PasswordUpdate, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const newPassword = update.updatedFields.password; if (!newPassword) { // If it's an old client it may have given us an email, // but we don't store those anymore return; } const verifyQuery = SQL` SELECT username, hash FROM users WHERE id = ${viewer.userID} `; const [verifyResult] = await dbQuery(verifyQuery); if (verifyResult.length === 0) { throw new ServerError('internal_error'); } const verifyRow = verifyResult[0]; if (!bcrypt.compareSync(update.currentPassword, verifyRow.hash)) { throw new ServerError('invalid_credentials'); } const changedFields = { hash: bcrypt.hashSync(newPassword) }; const saveQuery = SQL` UPDATE users SET ${changedFields} WHERE id = ${viewer.userID} `; await dbQuery(saveQuery); const updateDatas = [ { type: updateTypes.UPDATE_CURRENT_USER, userID: viewer.userID, time: Date.now(), }, ]; await createUpdates(updateDatas, { viewer, updatesForCurrentSession: 'broadcast', }); } // eslint-disable-next-line no-unused-vars async function checkAndSendVerificationEmail(viewer: Viewer): Promise { // We don't want to crash old clients that call this, // but we have nothing we can do because we no longer store email addresses } async function checkAndSendPasswordResetEmail( // eslint-disable-next-line no-unused-vars request: ResetPasswordRequest, ): Promise { // We don't want to crash old clients that call this, // but we have nothing we can do because we no longer store email addresses } /* eslint-disable no-unused-vars */ async function updatePassword( viewer: Viewer, request: UpdatePasswordRequest, ): Promise { /* eslint-enable no-unused-vars */ // We have no way to handle this request anymore throw new ServerError('deprecated'); } async function updateUserSettings( viewer: Viewer, request: UpdateUserSettingsRequest, ) { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const createOrUpdateSettingsQuery = SQL` INSERT INTO settings (user, name, data) VALUES ${[[viewer.id, request.name, request.data]]} - ON DUPLICATE KEY UPDATE data = VALUES(data) `; + const dbType = await getDBType(); + if (dbType === 'mysql5.7') { + createOrUpdateSettingsQuery.append(SQL` + ON DUPLICATE KEY UPDATE data = VALUES(data) + `); + } else { + createOrUpdateSettingsQuery.append(SQL` + ON DUPLICATE KEY UPDATE data = VALUE(data) + `); + } await dbQuery(createOrUpdateSettingsQuery); } export { accountUpdater, checkAndSendVerificationEmail, checkAndSendPasswordResetEmail, updateUserSettings, updatePassword, }; diff --git a/keyserver/src/updaters/activity-updaters.js b/keyserver/src/updaters/activity-updaters.js index 2a3cbe0a4..28dba54ab 100644 --- a/keyserver/src/updaters/activity-updaters.js +++ b/keyserver/src/updaters/activity-updaters.js @@ -1,517 +1,528 @@ // @flow import invariant from 'invariant'; import _difference from 'lodash/fp/difference'; import _max from 'lodash/fp/max'; import { localIDPrefix } from 'lib/shared/message-utils'; import type { UpdateActivityResult, UpdateActivityRequest, SetThreadUnreadStatusRequest, SetThreadUnreadStatusResult, } from 'lib/types/activity-types'; import { messageTypes } from 'lib/types/message-types'; import { threadPermissions } from 'lib/types/thread-types'; import { updateTypes } from 'lib/types/update-types'; import { ServerError } from 'lib/utils/errors'; import { createUpdates } from '../creators/update-creator'; import { dbQuery, SQL, mergeOrConditions } from '../database/database'; +import { getDBType } from '../database/db-config'; import type { SQLStatementType } from '../database/types'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters'; import { checkThread, getValidThreads, } from '../fetchers/thread-permission-fetchers'; import { rescindPushNotifs } from '../push/rescind'; import { updateBadgeCount } from '../push/send'; import type { Viewer } from '../session/viewer'; import { earliestFocusedTimeConsideredExpired } from '../shared/focused-times'; type PartialThreadStatus = { +focusActive: boolean, +threadID: string, +newLastReadMessage: ?number, }; type ThreadStatus = | { +focusActive: true, +threadID: string, +newLastReadMessage: number, +curLastReadMessage: number, +rescindCondition: SQLStatementType, } | { +focusActive: false, +threadID: string, +newLastReadMessage: ?number, +curLastReadMessage: number, +rescindCondition: ?SQLStatementType, +newerMessageFromOtherAuthor: boolean, }; async function activityUpdater( viewer: Viewer, request: UpdateActivityRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const focusUpdatesByThreadID = new Map(); for (const activityUpdate of request.updates) { const threadID = activityUpdate.threadID; const updatesForThreadID = focusUpdatesByThreadID.get(threadID) ?? []; if (!focusUpdatesByThreadID.has(threadID)) { focusUpdatesByThreadID.set(threadID, updatesForThreadID); } updatesForThreadID.push(activityUpdate); } const unverifiedThreadIDs: $ReadOnlySet = new Set( request.updates.map(update => update.threadID), ); const verifiedThreadsData = await getValidThreads( viewer, [...unverifiedThreadIDs], [ { check: 'permission', permission: threadPermissions.VISIBLE, }, ], ); if (verifiedThreadsData.length === 0) { return { unfocusedToUnread: [] }; } const memberThreadIDs = new Set(); const verifiedThreadIDs = []; for (const threadData of verifiedThreadsData) { if (threadData.role > 0) { memberThreadIDs.add(threadData.threadID); } verifiedThreadIDs.push(threadData.threadID); } const partialThreadStatuses: PartialThreadStatus[] = []; for (const threadID of verifiedThreadIDs) { const focusUpdates = focusUpdatesByThreadID.get(threadID); invariant(focusUpdates, `no focusUpdate for thread ID ${threadID}`); const focusActive = !focusUpdates.some(update => !update.focus); const newLastReadMessage = _max( focusUpdates .filter( update => update.latestMessage && !update.latestMessage.startsWith(localIDPrefix), ) .map(update => parseInt(update.latestMessage)), ); partialThreadStatuses.push({ threadID, focusActive, newLastReadMessage, }); } // We update the focused rows before we check for new messages so we can // guarantee that any messages that may have set the thread to unread before // we set it to focused are caught and overriden await updateFocusedRows(viewer, partialThreadStatuses); if (memberThreadIDs.size === 0) { return { unfocusedToUnread: [] }; } const memberPartialThreadStatuses = partialThreadStatuses.filter( partialStatus => memberThreadIDs.has(partialStatus.threadID), ); const unfocusedLatestMessages = new Map(); for (const partialThreadStatus of memberPartialThreadStatuses) { const { threadID, focusActive, newLastReadMessage } = partialThreadStatus; if (!focusActive) { unfocusedLatestMessages.set(threadID, newLastReadMessage ?? 0); } } const [ unfocusedThreadsWithNewerMessages, lastMessageInfos, ] = await Promise.all([ checkForNewerMessages(viewer, unfocusedLatestMessages), fetchLastMessageInfo(viewer, [...memberThreadIDs]), ]); const threadStatuses: ThreadStatus[] = []; for (const partialThreadStatus of memberPartialThreadStatuses) { const { threadID, focusActive, newLastReadMessage } = partialThreadStatus; const lastMessageInfo = lastMessageInfos.get(threadID); invariant( lastMessageInfo !== undefined, `no lastMessageInfo for thread ID ${threadID}`, ); const { lastMessage, lastReadMessage: curLastReadMessage, } = lastMessageInfo; if (focusActive) { threadStatuses.push({ focusActive: true, threadID, newLastReadMessage: newLastReadMessage ? Math.max(lastMessage, newLastReadMessage) : lastMessage, curLastReadMessage, rescindCondition: SQL`n.thread = ${threadID}`, }); } else { threadStatuses.push({ focusActive: false, threadID, newLastReadMessage, curLastReadMessage, rescindCondition: newLastReadMessage ? SQL`(n.thread = ${threadID} AND n.message <= ${newLastReadMessage})` : null, newerMessageFromOtherAuthor: unfocusedThreadsWithNewerMessages.has( threadID, ), }); } } // The following block determines whether to enqueue updates for a given // (user, thread) pair and whether to propagate badge count notifs to all of // that user's devices const setUnread: Array<{ +threadID: string, +unread: boolean }> = []; for (const threadStatus of threadStatuses) { const { threadID, curLastReadMessage } = threadStatus; if (!threadStatus.focusActive) { const { newLastReadMessage, newerMessageFromOtherAuthor } = threadStatus; if (newerMessageFromOtherAuthor) { setUnread.push({ threadID, unread: true }); } else if (!newLastReadMessage) { // This is a rare edge case. It should only be possible for threads that // have zero messages on both the client and server, which shouldn't // happen. In this case we'll set the thread to read, just in case... console.warn(`thread ID ${threadID} appears to have no messages`); setUnread.push({ threadID, unread: false }); } else if (newLastReadMessage > curLastReadMessage) { setUnread.push({ threadID, unread: false }); } } else { const { newLastReadMessage } = threadStatus; if (newLastReadMessage > curLastReadMessage) { setUnread.push({ threadID, unread: false }); } } } const time = Date.now(); const updateDatas = setUnread.map(({ threadID, unread }) => ({ type: updateTypes.UPDATE_THREAD_READ_STATUS, userID: viewer.userID, time, threadID, unread, })); const latestMessages = new Map(); for (const threadStatus of threadStatuses) { const { threadID, newLastReadMessage, curLastReadMessage } = threadStatus; if (newLastReadMessage && newLastReadMessage > curLastReadMessage) { latestMessages.set(threadID, newLastReadMessage); } } await Promise.all([ updateLastReadMessage(viewer, latestMessages), createUpdates(updateDatas, { viewer, updatesForCurrentSession: 'ignore' }), ]); // We do this afterwards so the badge count is correct const rescindConditions = threadStatuses .map(({ rescindCondition }) => rescindCondition) .filter(Boolean); let rescindCondition; if (rescindConditions.length > 0) { rescindCondition = SQL`n.user = ${viewer.userID} AND `; rescindCondition.append(mergeOrConditions(rescindConditions)); } await rescindAndUpdateBadgeCounts( viewer, rescindCondition, updateDatas.length > 0 ? 'activity_update' : null, ); return { unfocusedToUnread: [...unfocusedThreadsWithNewerMessages] }; } async function updateFocusedRows( viewer: Viewer, partialThreadStatuses: $ReadOnlyArray, ): Promise { const threadIDs = partialThreadStatuses .filter(threadStatus => threadStatus.focusActive) .map(({ threadID }) => threadID); const time = Date.now(); if (threadIDs.length > 0) { const focusedInsertRows = threadIDs.map(threadID => [ viewer.userID, viewer.session, threadID, time, ]); - await dbQuery(SQL` + const query = SQL` INSERT INTO focused (user, session, thread, time) VALUES ${focusedInsertRows} + `; + const dbType = await getDBType(); + if (dbType === 'mysql5.7') { + query.append(SQL` ON DUPLICATE KEY UPDATE time = VALUES(time) - `); + `); + } else { + query.append(SQL` + ON DUPLICATE KEY UPDATE time = VALUE(time) + `); + } + await dbQuery(query); } if (viewer.hasSessionInfo) { await deleteActivityForViewerSession(viewer, time); } } // To protect against a possible race condition, we reset the thread to unread // if the latest message ID on the client at the time that focus was dropped // is no longer the latest message ID. // Returns the set of unfocused threads that should be set to unread on // the client because a new message arrived since they were unfocused. async function checkForNewerMessages( viewer: Viewer, latestMessages: Map, ): Promise> { if (latestMessages.size === 0 || !viewer.loggedIn) { return new Set(); } const unfocusedThreadIDs = [...latestMessages.keys()]; const focusedElsewhereThreadIDs = await checkThreadsFocused( viewer, unfocusedThreadIDs, ); const unreadCandidates = _difference(unfocusedThreadIDs)( focusedElsewhereThreadIDs, ); if (unreadCandidates.length === 0) { return new Set(); } const knowOfExtractString = `$.${threadPermissions.KNOW_OF}.value`; const query = SQL` SELECT m.thread, MAX(m.id) AS latest_message FROM messages m LEFT JOIN memberships stm ON m.type = ${messageTypes.CREATE_SUB_THREAD} AND stm.thread = m.content AND stm.user = ${viewer.userID} WHERE m.thread IN (${unreadCandidates}) AND m.user != ${viewer.userID} AND ( m.type != ${messageTypes.CREATE_SUB_THREAD} OR JSON_EXTRACT(stm.permissions, ${knowOfExtractString}) IS TRUE ) GROUP BY m.thread `; const [result] = await dbQuery(query); const threadsWithNewerMessages = new Set(); for (const row of result) { const threadID = row.thread.toString(); const serverLatestMessage = row.latest_message; const clientLatestMessage = latestMessages.get(threadID); if (clientLatestMessage < serverLatestMessage) { threadsWithNewerMessages.add(threadID); } } return threadsWithNewerMessages; } async function checkThreadsFocused( viewer: Viewer, threadIDs: $ReadOnlyArray, ): Promise { const time = earliestFocusedTimeConsideredExpired(); const query = SQL` SELECT thread FROM focused WHERE time > ${time} AND user = ${viewer.userID} AND thread IN (${threadIDs}) GROUP BY thread `; const [result] = await dbQuery(query); const focusedThreadIDs = []; for (const row of result) { focusedThreadIDs.push(row.thread.toString()); } return focusedThreadIDs; } async function updateLastReadMessage( viewer: Viewer, lastReadMessages: $ReadOnlyMap, ) { if (lastReadMessages.size === 0) { return; } const query = SQL` UPDATE memberships SET last_read_message = GREATEST(last_read_message, CASE `; lastReadMessages.forEach((lastMessage, threadID) => { query.append(SQL` WHEN thread = ${threadID} THEN ${lastMessage} `); }); query.append(SQL` ELSE last_read_message END) WHERE thread IN (${[...lastReadMessages.keys()]}) AND user = ${viewer.userID} `); return await dbQuery(query); } async function setThreadUnreadStatus( viewer: Viewer, request: SetThreadUnreadStatusRequest, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const isMemberAndCanViewThread = await checkThread(viewer, request.threadID, [ { check: 'is_member', }, { check: 'permission', permission: threadPermissions.VISIBLE, }, ]); if (!isMemberAndCanViewThread) { throw new ServerError('invalid_parameters'); } const resetThreadToUnread = await shouldResetThreadToUnread(viewer, request); if (!resetThreadToUnread) { const lastReadMessage = request.unread ? SQL`0` : SQL`GREATEST(m.last_read_message, ${request.latestMessage ?? 0})`; const update = SQL` UPDATE memberships m SET m.last_read_message = `; update.append(lastReadMessage); update.append(SQL` WHERE m.thread = ${request.threadID} AND m.user = ${viewer.userID} `); const queryPromise = dbQuery(update); const time = Date.now(); const updatesPromise = createUpdates( [ { type: updateTypes.UPDATE_THREAD_READ_STATUS, userID: viewer.userID, time: time, threadID: request.threadID, unread: request.unread, }, ], { viewer, updatesForCurrentSession: 'ignore' }, ); await Promise.all([updatesPromise, queryPromise]); } let rescindCondition; if (!request.unread) { rescindCondition = SQL` n.user = ${viewer.userID} AND n.thread = ${request.threadID} AND n.message <= ${request.latestMessage} `; } await rescindAndUpdateBadgeCounts( viewer, rescindCondition, request.unread ? 'mark_as_unread' : 'mark_as_read', ); return { resetToUnread: resetThreadToUnread, }; } async function rescindAndUpdateBadgeCounts( viewer: Viewer, rescindCondition: ?SQLStatementType, badgeCountUpdateSource: ?( | 'activity_update' | 'mark_as_unread' | 'mark_as_read' ), ) { const excludeDeviceTokens = []; if (rescindCondition) { const handledDeviceTokens = await rescindPushNotifs(rescindCondition); excludeDeviceTokens.push(...handledDeviceTokens); } if (badgeCountUpdateSource) { await updateBadgeCount(viewer, badgeCountUpdateSource, excludeDeviceTokens); } } async function shouldResetThreadToUnread( viewer: Viewer, request: SetThreadUnreadStatusRequest, ): Promise { if (request.unread) { return false; } const threadsWithNewerMessages = await checkForNewerMessages( viewer, new Map([[request.threadID, parseInt(request.latestMessage) || 0]]), ); return threadsWithNewerMessages.has(request.threadID); } type LastMessageInfo = { +lastMessage: number, +lastReadMessage: number, }; async function fetchLastMessageInfo( viewer: Viewer, threadIDs: $ReadOnlyArray, ) { const query = SQL` SELECT thread, last_message, last_read_message FROM memberships WHERE user = ${viewer.userID} AND thread IN (${threadIDs}) `; const [result] = await dbQuery(query); const lastMessages = new Map(); for (const row of result) { const threadID = row.thread.toString(); const lastMessage = row.last_message; const lastReadMessage = row.last_read_message; lastMessages.set(threadID, { lastMessage, lastReadMessage }); } return lastMessages; } export { activityUpdater, setThreadUnreadStatus }; diff --git a/keyserver/src/updaters/relationship-updaters.js b/keyserver/src/updaters/relationship-updaters.js index 80d832faa..88342d59c 100644 --- a/keyserver/src/updaters/relationship-updaters.js +++ b/keyserver/src/updaters/relationship-updaters.js @@ -1,370 +1,413 @@ // @flow import invariant from 'invariant'; import { sortIDs } from 'lib/shared/relationship-utils'; import { messageTypes } from 'lib/types/message-types'; import { type RelationshipRequest, type RelationshipErrors, type UndirectedRelationshipRow, relationshipActions, undirectedStatus, directedStatus, } from 'lib/types/relationship-types'; import { threadTypes } from 'lib/types/thread-types'; import { updateTypes, type UpdateData } from 'lib/types/update-types'; import { cartesianProduct } from 'lib/utils/array'; import { ServerError } from 'lib/utils/errors'; import { promiseAll } from 'lib/utils/promises'; import createMessages from '../creators/message-creator'; import { createThread } from '../creators/thread-creator'; import { createUpdates } from '../creators/update-creator'; import { dbQuery, SQL, mergeOrConditions } from '../database/database'; +import { getDBType } from '../database/db-config'; import { fetchFriendRequestRelationshipOperations } from '../fetchers/relationship-fetchers'; import { fetchUserInfos } from '../fetchers/user-fetchers'; import type { Viewer } from '../session/viewer'; async function updateRelationships( viewer: Viewer, request: RelationshipRequest, ): Promise { const { action } = request; if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const uniqueUserIDs = [...new Set(request.userIDs)]; - const users = await fetchUserInfos(uniqueUserIDs); + const [users, dbType] = await Promise.all([ + fetchUserInfos(uniqueUserIDs), + getDBType(), + ]); let errors = {}; const userIDs: string[] = []; for (const userID of uniqueUserIDs) { if (userID === viewer.userID || !users[userID].username) { const acc = errors.invalid_user || []; errors.invalid_user = [...acc, userID]; } else { userIDs.push(userID); } } if (!userIDs.length) { return Object.freeze({ ...errors }); } const updateIDs = []; if (action === relationshipActions.FRIEND) { // We have to create personal threads before setting the relationship // status. By doing that we make sure that failed thread creation is // reported to the caller and can be repeated - there should be only // one PERSONAL thread per a pair of users and we can safely call it // repeatedly. const threadIDPerUser = await createPersonalThreads(viewer, request); const { userRelationshipOperations, errors: friendRequestErrors, } = await fetchFriendRequestRelationshipOperations(viewer, userIDs); errors = { ...errors, ...friendRequestErrors }; const undirectedInsertRows = []; const directedInsertRows = []; const directedDeleteIDs = []; const messageDatas = []; const now = Date.now(); for (const userID in userRelationshipOperations) { const operations = userRelationshipOperations[userID]; const ids = sortIDs(viewer.userID, userID); if (operations.length) { updateIDs.push(userID); } for (const operation of operations) { if (operation === 'delete_directed') { directedDeleteIDs.push(userID); } else if (operation === 'friend') { const [user1, user2] = ids; const status = undirectedStatus.FRIEND; undirectedInsertRows.push({ user1, user2, status }); messageDatas.push({ type: messageTypes.UPDATE_RELATIONSHIP, threadID: threadIDPerUser[userID], creatorID: viewer.userID, targetID: userID, time: now, operation: 'request_accepted', }); } else if (operation === 'pending_friend') { const status = directedStatus.PENDING_FRIEND; directedInsertRows.push([viewer.userID, userID, status]); messageDatas.push({ type: messageTypes.UPDATE_RELATIONSHIP, threadID: threadIDPerUser[userID], creatorID: viewer.userID, targetID: userID, time: now, operation: 'request_sent', }); } else if (operation === 'know_of') { const [user1, user2] = ids; const status = undirectedStatus.KNOW_OF; undirectedInsertRows.push({ user1, user2, status }); } else { invariant(false, `unexpected relationship operation ${operation}`); } } } const promises = [updateUndirectedRelationships(undirectedInsertRows)]; if (directedInsertRows.length) { const directedInsertQuery = SQL` INSERT INTO relationships_directed (user1, user2, status) VALUES ${directedInsertRows} - ON DUPLICATE KEY UPDATE status = VALUES(status) `; + if (dbType === 'mysql5.7') { + directedInsertQuery.append(SQL` + ON DUPLICATE KEY UPDATE status = VALUES(status) + `); + } else { + directedInsertQuery.append(SQL` + ON DUPLICATE KEY UPDATE status = VALUE(status) + `); + } promises.push(dbQuery(directedInsertQuery)); } if (directedDeleteIDs.length) { const directedDeleteQuery = SQL` DELETE FROM relationships_directed WHERE (user1 = ${viewer.userID} AND user2 IN (${directedDeleteIDs})) OR (status = ${directedStatus.PENDING_FRIEND} AND user1 IN (${directedDeleteIDs}) AND user2 = ${viewer.userID}) `; promises.push(dbQuery(directedDeleteQuery)); } if (messageDatas.length > 0) { promises.push(createMessages(viewer, messageDatas, 'broadcast')); } await Promise.all(promises); } else if (action === relationshipActions.UNFRIEND) { updateIDs.push(...userIDs); const updateRows = userIDs.map(userID => { const [user1, user2] = sortIDs(viewer.userID, userID); return { user1, user2, status: undirectedStatus.KNOW_OF }; }); const deleteQuery = SQL` DELETE FROM relationships_directed WHERE status = ${directedStatus.PENDING_FRIEND} AND (user1 = ${viewer.userID} AND user2 IN (${userIDs}) OR user1 IN (${userIDs}) AND user2 = ${viewer.userID}) `; await Promise.all([ updateUndirectedRelationships(updateRows, false), dbQuery(deleteQuery), ]); } else if (action === relationshipActions.BLOCK) { updateIDs.push(...userIDs); const directedRows = []; const undirectedRows = []; for (const userID of userIDs) { directedRows.push([viewer.userID, userID, directedStatus.BLOCKED]); const [user1, user2] = sortIDs(viewer.userID, userID); undirectedRows.push({ user1, user2, status: undirectedStatus.KNOW_OF }); } const directedInsertQuery = SQL` INSERT INTO relationships_directed (user1, user2, status) VALUES ${directedRows} - ON DUPLICATE KEY UPDATE status = VALUES(status) `; + if (dbType === 'mysql5.7') { + directedInsertQuery.append(SQL` + ON DUPLICATE KEY UPDATE status = VALUES(status) + `); + } else { + directedInsertQuery.append(SQL` + ON DUPLICATE KEY UPDATE status = VALUE(status) + `); + } const directedDeleteQuery = SQL` DELETE FROM relationships_directed WHERE status = ${directedStatus.PENDING_FRIEND} AND user1 IN (${userIDs}) AND user2 = ${viewer.userID} `; await Promise.all([ dbQuery(directedInsertQuery), dbQuery(directedDeleteQuery), updateUndirectedRelationships(undirectedRows, false), ]); } else if (action === relationshipActions.UNBLOCK) { updateIDs.push(...userIDs); const query = SQL` DELETE FROM relationships_directed WHERE status = ${directedStatus.BLOCKED} AND user1 = ${viewer.userID} AND user2 IN (${userIDs}) `; await dbQuery(query); } else { invariant(false, `action ${action} is invalid or not supported currently`); } await createUpdates( updateDatasForUserPairs(cartesianProduct([viewer.userID], updateIDs)), ); return Object.freeze({ ...errors }); } function updateDatasForUserPairs( userPairs: $ReadOnlyArray<[string, string]>, ): UpdateData[] { const time = Date.now(); const updateDatas = []; for (const [user1, user2] of userPairs) { updateDatas.push({ type: updateTypes.UPDATE_USER, userID: user1, time, updatedUserID: user2, }); updateDatas.push({ type: updateTypes.UPDATE_USER, userID: user2, time, updatedUserID: user1, }); } return updateDatas; } async function updateUndirectedRelationships( changeset: UndirectedRelationshipRow[], greatest: boolean = true, ) { if (!changeset.length) { return; } const rows = changeset.map(row => [row.user1, row.user2, row.status]); const query = SQL` INSERT INTO relationships_undirected (user1, user2, status) VALUES ${rows} `; + + const dbType = await getDBType(); if (greatest) { - query.append( - SQL`ON DUPLICATE KEY UPDATE status = GREATEST(status, VALUES(status))`, - ); + if (dbType === 'mysql5.7') { + query.append( + SQL`ON DUPLICATE KEY UPDATE status = GREATEST(status, VALUES(status))`, + ); + } else { + query.append( + SQL`ON DUPLICATE KEY UPDATE status = GREATEST(status, VALUE(status))`, + ); + } } else { - query.append(SQL`ON DUPLICATE KEY UPDATE status = VALUES(status)`); + if (dbType === 'mysql5.7') { + query.append(SQL`ON DUPLICATE KEY UPDATE status = VALUES(status)`); + } else { + query.append(SQL`ON DUPLICATE KEY UPDATE status = VALUE(status)`); + } } await dbQuery(query); } async function updateChangedUndirectedRelationships( changeset: UndirectedRelationshipRow[], ): Promise { if (changeset.length === 0) { return []; } const user2ByUser1: Map> = new Map(); for (const { user1, user2 } of changeset) { if (!user2ByUser1.has(user1)) { user2ByUser1.set(user1, new Set()); } user2ByUser1.get(user1)?.add(user2); } const selectQuery = SQL` SELECT user1, user2, status FROM relationships_undirected WHERE `; const conditions = []; for (const [user1, users] of user2ByUser1) { conditions.push(SQL`(user1 = ${user1} AND user2 IN (${[...users]}))`); } selectQuery.append(mergeOrConditions(conditions)); - const [result] = await dbQuery(selectQuery); + const [[result], dbType] = await Promise.all([ + dbQuery(selectQuery), + getDBType(), + ]); const existingStatuses = new Map(); for (const row of result) { existingStatuses.set(`${row.user1}|${row.user2}`, row.status); } const insertRows = []; for (const row of changeset) { const existingStatus = existingStatuses.get(`${row.user1}|${row.user2}`); if (!existingStatus || existingStatus < row.status) { insertRows.push([row.user1, row.user2, row.status]); } } if (insertRows.length === 0) { return []; } const insertQuery = SQL` INSERT INTO relationships_undirected (user1, user2, status) VALUES ${insertRows} - ON DUPLICATE KEY UPDATE status = GREATEST(status, VALUES(status)) `; + if (dbType === 'mysql5.7') { + insertQuery.append(SQL` + ON DUPLICATE KEY UPDATE status = GREATEST(status, VALUES(status)) + `); + } else { + insertQuery.append(SQL` + ON DUPLICATE KEY UPDATE status = GREATEST(status, VALUE(status)) + `); + } await dbQuery(insertQuery); return updateDatasForUserPairs( insertRows.map(([user1, user2]) => [user1, user2]), ); } async function createPersonalThreads( viewer: Viewer, request: RelationshipRequest, ) { invariant( request.action === relationshipActions.FRIEND, 'We should only create a PERSONAL threads when sending a FRIEND request, ' + `but we tried to do that for ${request.action}`, ); const threadIDPerUser = {}; const personalThreadsQuery = SQL` SELECT t.id AS threadID, m2.user AS user2 FROM threads t INNER JOIN memberships m1 ON m1.thread = t.id AND m1.user = ${viewer.userID} INNER JOIN memberships m2 ON m2.thread = t.id AND m2.user IN (${request.userIDs}) WHERE t.type = ${threadTypes.PERSONAL} AND m1.role > 0 AND m2.role > 0 `; const [personalThreadsResult] = await dbQuery(personalThreadsQuery); for (const row of personalThreadsResult) { const user2 = row.user2.toString(); threadIDPerUser[user2] = row.threadID.toString(); } const threadCreationPromises = {}; for (const userID of request.userIDs) { if (threadIDPerUser[userID]) { continue; } threadCreationPromises[userID] = createThread( viewer, { type: threadTypes.PERSONAL, initialMemberIDs: [userID], }, { forceAddMembers: true, updatesForCurrentSession: 'broadcast' }, ); } const personalThreadPerUser = await promiseAll(threadCreationPromises); for (const userID in personalThreadPerUser) { const newThread = personalThreadPerUser[userID]; threadIDPerUser[userID] = newThread.newThreadID ?? newThread.newThreadInfo.id; } return threadIDPerUser; } export { updateRelationships, updateDatasForUserPairs, updateUndirectedRelationships, updateChangedUndirectedRelationships, }; diff --git a/keyserver/src/updaters/thread-permission-updaters.js b/keyserver/src/updaters/thread-permission-updaters.js index b2ffba883..cae2046a0 100644 --- a/keyserver/src/updaters/thread-permission-updaters.js +++ b/keyserver/src/updaters/thread-permission-updaters.js @@ -1,1333 +1,1351 @@ // @flow import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual'; import bots from 'lib/facts/bots'; import genesis from 'lib/facts/genesis'; import { makePermissionsBlob, makePermissionsForChildrenBlob, getRoleForPermissions, } from 'lib/permissions/thread-permissions'; import type { CalendarQuery } from 'lib/types/entry-types'; import { type ThreadPermissionsBlob, type ThreadRolePermissionsBlob, type ThreadType, assertThreadType, } from 'lib/types/thread-types'; import { updateTypes, type ServerUpdateInfo, type CreateUpdatesResult, } from 'lib/types/update-types'; import { pushAll } from 'lib/utils/array'; import { ServerError } from 'lib/utils/errors'; import { createUpdates, type UpdatesForCurrentSession, } from '../creators/update-creator'; import { dbQuery, SQL } from '../database/database'; import { getDBType } from '../database/db-config'; import { fetchServerThreadInfos, rawThreadInfosFromServerThreadInfos, type FetchThreadInfosResult, } from '../fetchers/thread-fetchers'; import { rescindPushNotifs } from '../push/rescind'; import { createScriptViewer } from '../session/scripts'; import type { Viewer } from '../session/viewer'; import { updateRoles } from '../updaters/role-updaters'; import DepthQueue from '../utils/depth-queue'; import RelationshipChangeset from '../utils/relationship-changeset'; import { updateChangedUndirectedRelationships } from './relationship-updaters'; export type MembershipRowToSave = { +operation: 'save', +intent: 'join' | 'leave' | 'none', +userID: string, +threadID: string, +userNeedsFullThreadDetails: boolean, +permissions: ?ThreadPermissionsBlob, +permissionsForChildren: ?ThreadPermissionsBlob, // null role represents by "0" +role: string, +oldRole: string, +unread?: boolean, }; type MembershipRowToDelete = { +operation: 'delete', +intent: 'join' | 'leave' | 'none', +userID: string, +threadID: string, +oldRole: string, }; type MembershipRow = MembershipRowToSave | MembershipRowToDelete; type Changeset = { +membershipRows: MembershipRow[], +relationshipChangeset: RelationshipChangeset, }; // 0 role means to remove the user from the thread // null role means to set the user to the default role // string role means to set the user to the role with that ID // -1 role means to set the user as a "ghost" (former member) type ChangeRoleOptions = { +setNewMembersToUnread?: boolean, }; type ChangeRoleMemberInfo = { permissionsFromParent?: ?ThreadPermissionsBlob, memberOfContainingThread?: boolean, }; async function changeRole( threadID: string, userIDs: $ReadOnlyArray, role: string | -1 | 0 | null, options?: ChangeRoleOptions, ): Promise { const intent = role === -1 || role === 0 ? 'leave' : 'join'; const setNewMembersToUnread = options?.setNewMembersToUnread && intent === 'join'; if (userIDs.length === 0) { return { membershipRows: [], relationshipChangeset: new RelationshipChangeset(), }; } const membershipQuery = SQL` SELECT user, role, permissions, permissions_for_children FROM memberships WHERE thread = ${threadID} `; const parentMembershipQuery = SQL` SELECT pm.user, pm.permissions_for_children AS permissions_from_parent FROM threads t INNER JOIN memberships pm ON pm.thread = t.parent_thread_id WHERE t.id = ${threadID} AND (pm.user IN (${userIDs}) OR t.parent_thread_id != ${genesis.id}) `; const containingMembershipQuery = SQL` SELECT cm.user, cm.role AS containing_role FROM threads t INNER JOIN memberships cm ON cm.thread = t.containing_thread_id WHERE t.id = ${threadID} AND cm.user IN (${userIDs}) `; const [ dbType, [membershipResults], [parentMembershipResults], containingMembershipResults, roleThreadResult, ] = await Promise.all([ getDBType(), dbQuery(membershipQuery), dbQuery(parentMembershipQuery), (async () => { if (intent === 'leave') { // Membership in the container only needs to be checked for members return []; } const [result] = await dbQuery(containingMembershipQuery); return result; })(), changeRoleThreadQuery(threadID, role), ]); const { roleColumnValue: intendedRole, threadType, parentThreadID, hasContainingThreadID, rolePermissions: intendedRolePermissions, depth, } = roleThreadResult; const existingMembershipInfo = new Map(); for (const row of membershipResults) { const userID = row.user.toString(); existingMembershipInfo.set(userID, { oldRole: row.role.toString(), oldPermissions: dbType === 'mysql5.7' ? row.permissions : JSON.parse(row.permissions), oldPermissionsForChildren: dbType === 'mysql5.7' ? row.permissions_for_children : JSON.parse(row.permissions_for_children), }); } const ancestorMembershipInfo: Map = new Map(); for (const row of parentMembershipResults) { const userID = row.user.toString(); if (!userIDs.includes(userID)) { continue; } ancestorMembershipInfo.set(userID, { permissionsFromParent: dbType === 'mysql5.7' ? row.permissions_from_parent : JSON.parse(row.permissions_from_parent), }); } for (const row of containingMembershipResults) { const userID = row.user.toString(); const ancestorMembership = ancestorMembershipInfo.get(userID); const memberOfContainingThread = row.containing_role > 0; if (ancestorMembership) { ancestorMembership.memberOfContainingThread = memberOfContainingThread; } else { ancestorMembershipInfo.set(userID, { memberOfContainingThread, }); } } const relationshipChangeset = new RelationshipChangeset(); const existingMemberIDs = [...existingMembershipInfo.keys()]; if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const parentMemberIDs = parentMembershipResults.map(row => row.user.toString(), ); if (parentThreadID && parentThreadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(parentMemberIDs); } const membershipRows = []; const toUpdateDescendants = new Map(); for (const userID of userIDs) { const existingMembership = existingMembershipInfo.get(userID); const oldRole = existingMembership?.oldRole ?? '-1'; const oldPermissions = existingMembership?.oldPermissions ?? null; const oldPermissionsForChildren = existingMembership?.oldPermissionsForChildren ?? null; if (existingMembership && oldRole === intendedRole) { // If the old role is the same as the new one, we have nothing to update continue; } else if (Number(oldRole) > 0 && role === null) { // In the case where we're just trying to add somebody to a thread, if // they already have a role with a nonzero role then we don't need to do // anything continue; } let permissionsFromParent = null; let memberOfContainingThread = false; const ancestorMembership = ancestorMembershipInfo.get(userID); if (ancestorMembership) { permissionsFromParent = ancestorMembership.permissionsFromParent; memberOfContainingThread = ancestorMembership.memberOfContainingThread; } if (!hasContainingThreadID) { memberOfContainingThread = true; } const rolePermissions = memberOfContainingThread ? intendedRolePermissions : null; const targetRole = memberOfContainingThread ? intendedRole : '-1'; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userBecameMember = Number(oldRole) <= 0 && Number(newRole) > 0; const userLostMembership = Number(oldRole) > 0 && Number(newRole) <= 0; if ( (intent === 'join' && Number(newRole) <= 0) || (intent === 'leave' && Number(newRole) > 0) ) { throw new ServerError('invalid_parameters'); } else if (intendedRole !== newRole) { console.warn( `changeRole called for role=${intendedRole}, but ended up setting ` + `role=${newRole} for userID ${userID} and threadID ${threadID}, ` + 'probably because KNOW_OF permission was unexpectedly present or ' + 'missing', ); } if ( existingMembership && _isEqual(permissions)(oldPermissions) && oldRole === newRole ) { // This thread and all of its descendants need no updates for this user, // since the corresponding memberships row is unchanged by this operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent, userID, threadID, userNeedsFullThreadDetails: userBecameMember, permissions, permissionsForChildren, role: newRole, oldRole, unread: userBecameMember && setNewMembersToUnread, }); } else { membershipRows.push({ operation: 'delete', intent, userID, threadID, oldRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { relationshipChangeset.setRelationshipsNeeded(userID, existingMemberIDs); } if ( userLostMembership || !_isEqual(permissionsForChildren)(oldPermissionsForChildren) ) { toUpdateDescendants.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (toUpdateDescendants.size > 0) { const { membershipRows: descendantMembershipRows, relationshipChangeset: descendantRelationshipChangeset, } = await updateDescendantPermissions({ threadID, depth, changesByUser: toUpdateDescendants, }); pushAll(membershipRows, descendantMembershipRows); relationshipChangeset.addAll(descendantRelationshipChangeset); } return { membershipRows, relationshipChangeset }; } type RoleThreadResult = { +roleColumnValue: string, +depth: number, +threadType: ThreadType, +parentThreadID: ?string, +hasContainingThreadID: boolean, +rolePermissions: ?ThreadRolePermissionsBlob, }; async function changeRoleThreadQuery( threadID: string, role: string | -1 | 0 | null, ): Promise { if (role === 0 || role === -1) { const query = SQL` SELECT type, depth, parent_thread_id, containing_thread_id FROM threads WHERE id = ${threadID} `; const [result] = await dbQuery(query); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: role.toString(), depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: null, }; } else if (role !== null) { const query = SQL` SELECT t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions FROM threads t INNER JOIN roles r ON r.thread = t.id AND r.id = ${role} WHERE t.id = ${threadID} `; const [[result], dbType] = await Promise.all([dbQuery(query), getDBType()]); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: role, depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: dbType === 'mysql5.7' ? row.permissions : JSON.parse(row.permissions), }; } else { const query = SQL` SELECT t.type, t.depth, t.parent_thread_id, t.containing_thread_id, t.default_role, r.permissions FROM threads t INNER JOIN roles r ON r.thread = t.id AND r.id = t.default_role WHERE t.id = ${threadID} `; const [[result], dbType] = await Promise.all([dbQuery(query), getDBType()]); if (result.length === 0) { throw new ServerError('internal_error'); } const row = result[0]; return { roleColumnValue: row.default_role.toString(), depth: row.depth, threadType: assertThreadType(row.type), parentThreadID: row.parent_thread_id ? row.parent_thread_id.toString() : null, hasContainingThreadID: row.containing_thread_id !== null, rolePermissions: dbType === 'mysql5.7' ? row.permissions : JSON.parse(row.permissions), }; } } type ChangedAncestor = { +threadID: string, +depth: number, +changesByUser: Map, }; type AncestorChanges = { +userIsMember: boolean, +permissionsForChildren: ?ThreadPermissionsBlob, }; async function updateDescendantPermissions( initialChangedAncestor: ChangedAncestor, ): Promise { const membershipRows = []; const relationshipChangeset = new RelationshipChangeset(); const initialDescendants = await fetchDescendantsForUpdate([ initialChangedAncestor, ]); const depthQueue = new DepthQueue( getDescendantDepth, getDescendantKey, mergeDescendants, ); depthQueue.addInfos(initialDescendants); let descendants; while ((descendants = depthQueue.getNextDepth())) { const descendantsAsAncestors = []; for (const descendant of descendants) { const { threadID, threadType, depth, users } = descendant; const existingMembers = [...users.entries()]; const existingMemberIDs = existingMembers .filter(([, { curRole }]) => curRole) .map(([userID]) => userID); if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const usersForNextLayer = new Map(); for (const [userID, user] of users) { const { curRolePermissions, curPermissionsFromParent, curMemberOfContainingThread, nextMemberOfContainingThread, nextPermissionsFromParent, potentiallyNeedsUpdate, } = user; const existingMembership = !!user.curRole; const curRole = user.curRole ?? '-1'; const curPermissions = user.curPermissions ?? null; const curPermissionsForChildren = user.curPermissionsForChildren ?? null; if (!potentiallyNeedsUpdate) { continue; } const permissionsFromParent = nextPermissionsFromParent === undefined ? curPermissionsFromParent : nextPermissionsFromParent; const memberOfContainingThread = nextMemberOfContainingThread === undefined ? curMemberOfContainingThread : nextMemberOfContainingThread; const targetRole = memberOfContainingThread ? curRole : '-1'; const rolePermissions = memberOfContainingThread ? curRolePermissions : null; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob( permissions, ); const newRole = getRoleForPermissions(targetRole, permissions); const userLostMembership = Number(curRole) > 0 && Number(newRole) <= 0; if (_isEqual(permissions)(curPermissions) && curRole === newRole) { // This thread and all of its descendants need no updates for this // user, since the corresponding memberships row is unchanged by this // operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent: 'none', userID, threadID, userNeedsFullThreadDetails: false, permissions, permissionsForChildren, role: newRole, oldRole: curRole, }); } else { membershipRows.push({ operation: 'delete', intent: 'none', userID, threadID, oldRole: curRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { // If there was no membership row before, and we are creating one, // we'll need to make sure the new member has a relationship row with // each existing member. We expect that whoever called us already // generated memberships row for the new members, will will lead // saveMemberships to generate relationships rows between those new // users. relationshipChangeset.setRelationshipsNeeded( userID, existingMemberIDs, ); } if ( userLostMembership || !_isEqual(permissionsForChildren)(curPermissionsForChildren) ) { usersForNextLayer.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (usersForNextLayer.size > 0) { descendantsAsAncestors.push({ threadID, depth, changesByUser: usersForNextLayer, }); } } const nextDescendants = await fetchDescendantsForUpdate( descendantsAsAncestors, ); depthQueue.addInfos(nextDescendants); } return { membershipRows, relationshipChangeset }; } type DescendantUserInfo = $Shape<{ curRole?: string, curRolePermissions?: ?ThreadRolePermissionsBlob, curPermissions?: ?ThreadPermissionsBlob, curPermissionsForChildren?: ?ThreadPermissionsBlob, curPermissionsFromParent?: ?ThreadPermissionsBlob, curMemberOfContainingThread?: boolean, nextPermissionsFromParent?: ?ThreadPermissionsBlob, nextMemberOfContainingThread?: boolean, potentiallyNeedsUpdate?: boolean, }>; type DescendantInfo = { +threadID: string, +parentThreadID: string, +containingThreadID: string, +threadType: ThreadType, +depth: number, +users: Map, }; const fetchDescendantsBatchSize = 10; async function fetchDescendantsForUpdate( ancestors: $ReadOnlyArray, ): Promise { const threadIDs = ancestors.map(ancestor => ancestor.threadID); const rows = []; const queriesPromise = (async () => { while (threadIDs.length > 0) { const batch = threadIDs.splice(0, fetchDescendantsBatchSize); const query = SQL` SELECT t.id, m.user, t.type, t.depth, t.parent_thread_id, t.containing_thread_id, r.permissions AS role_permissions, m.permissions, m.permissions_for_children, m.role, pm.permissions_for_children AS permissions_from_parent, cm.role AS containing_role FROM threads t INNER JOIN memberships m ON m.thread = t.id LEFT JOIN memberships pm ON pm.thread = t.parent_thread_id AND pm.user = m.user LEFT JOIN memberships cm ON cm.thread = t.containing_thread_id AND cm.user = m.user LEFT JOIN roles r ON r.id = m.role WHERE t.parent_thread_id IN (${batch}) OR t.containing_thread_id IN (${batch}) `; const [results] = await dbQuery(query); pushAll(rows, results); } })(); const [dbType] = await Promise.all([getDBType(), queriesPromise]); const descendantThreadInfos: Map = new Map(); for (const row of rows) { const descendantThreadID = row.id.toString(); if (!descendantThreadInfos.has(descendantThreadID)) { descendantThreadInfos.set(descendantThreadID, { threadID: descendantThreadID, parentThreadID: row.parent_thread_id.toString(), containingThreadID: row.containing_thread_id.toString(), threadType: assertThreadType(row.type), depth: row.depth, users: new Map(), }); } const descendantThreadInfo = descendantThreadInfos.get(descendantThreadID); invariant( descendantThreadInfo, `value should exist for key ${descendantThreadID}`, ); const userID = row.user.toString(); descendantThreadInfo.users.set(userID, { curRole: row.role.toString(), curRolePermissions: dbType === 'mysql5.7' ? row.role_permissions : JSON.parse(row.role_permissions), curPermissions: dbType === 'mysql5.7' ? row.permissions : JSON.parse(row.permissions), curPermissionsForChildren: dbType === 'mysql5.7' ? row.permissions_for_children : JSON.parse(row.permissions_for_children), curPermissionsFromParent: dbType === 'mysql5.7' ? row.permissions_from_parent : JSON.parse(row.permissions_from_parent), curMemberOfContainingThread: row.containing_role > 0, }); } for (const ancestor of ancestors) { const { threadID, changesByUser } = ancestor; for (const [userID, changes] of changesByUser) { for (const descendantThreadInfo of descendantThreadInfos.values()) { const { users, parentThreadID, containingThreadID, } = descendantThreadInfo; if (threadID !== parentThreadID && threadID !== containingThreadID) { continue; } let user = users.get(userID); if (!user) { user = {}; users.set(userID, user); } if (threadID === parentThreadID) { user.nextPermissionsFromParent = changes.permissionsForChildren; user.potentiallyNeedsUpdate = true; } if (threadID === containingThreadID) { user.nextMemberOfContainingThread = changes.userIsMember; if (!user.nextMemberOfContainingThread) { user.potentiallyNeedsUpdate = true; } } } } } return [...descendantThreadInfos.values()]; } function getDescendantDepth(descendant: DescendantInfo): number { return descendant.depth; } function getDescendantKey(descendant: DescendantInfo): string { return descendant.threadID; } function mergeDescendants( a: DescendantInfo, b: DescendantInfo, ): DescendantInfo { const { users: usersA, ...restA } = a; const { users: usersB, ...restB } = b; if (!_isEqual(restA)(restB)) { console.warn( `inconsistent descendantInfos ${JSON.stringify(restA)}, ` + JSON.stringify(restB), ); throw new ServerError('internal_error'); } const newUsers = new Map(usersA); for (const [userID, userFromB] of usersB) { const userFromA = newUsers.get(userID); if (!userFromA) { newUsers.set(userID, userFromB); } else { newUsers.set(userID, { ...userFromA, ...userFromB }); } } return { ...a, users: newUsers }; } type RecalculatePermissionsMemberInfo = { role?: ?string, permissions?: ?ThreadPermissionsBlob, permissionsForChildren?: ?ThreadPermissionsBlob, rolePermissions?: ?ThreadRolePermissionsBlob, memberOfContainingThread?: boolean, permissionsFromParent?: ?ThreadPermissionsBlob, }; async function recalculateThreadPermissions( threadID: string, ): Promise { const threadQuery = SQL` SELECT type, depth, parent_thread_id, containing_thread_id FROM threads WHERE id = ${threadID} `; const membershipQuery = SQL` SELECT m.user, m.role, m.permissions, m.permissions_for_children, r.permissions AS role_permissions, cm.role AS containing_role FROM threads t INNER JOIN memberships m ON m.thread = t.id LEFT JOIN roles r ON r.id = m.role LEFT JOIN memberships cm ON cm.user = m.user AND cm.thread = t.containing_thread_id WHERE t.id = ${threadID} `; const parentMembershipQuery = SQL` SELECT pm.user, pm.permissions_for_children AS permissions_from_parent FROM threads t INNER JOIN memberships pm ON pm.thread = t.parent_thread_id WHERE t.id = ${threadID} `; const [ dbType, [threadResults], [membershipResults], [parentMembershipResults], ] = await Promise.all([ getDBType(), dbQuery(threadQuery), dbQuery(membershipQuery), dbQuery(parentMembershipQuery), ]); if (threadResults.length !== 1) { throw new ServerError('internal_error'); } const [threadResult] = threadResults; const threadType = assertThreadType(threadResult.type); const depth = threadResult.depth; const hasContainingThreadID = threadResult.containing_thread_id !== null; const parentThreadID = threadResult.parent_thread_id?.toString(); const membershipInfo: Map< string, RecalculatePermissionsMemberInfo, > = new Map(); for (const row of membershipResults) { const userID = row.user.toString(); membershipInfo.set(userID, { role: row.role.toString(), permissions: dbType === 'mysql5.7' ? row.permissions : JSON.parse(row.permissions), permissionsForChildren: dbType === 'mysql5.7' ? row.permissions_for_children : JSON.parse(row.permissions_for_children), rolePermissions: dbType === 'mysql5.7' ? row.role_permissions : JSON.parse(row.role_permissions), memberOfContainingThread: !!( row.containing_role && row.containing_role > 0 ), }); } for (const row of parentMembershipResults) { const userID = row.user.toString(); const permissionsFromParent = dbType === 'mysql5.7' ? row.permissions_from_parent : JSON.parse(row.permissions_from_parent); const membership = membershipInfo.get(userID); if (membership) { membership.permissionsFromParent = permissionsFromParent; } else { membershipInfo.set(userID, { permissionsFromParent: permissionsFromParent, }); } } const relationshipChangeset = new RelationshipChangeset(); const existingMemberIDs = membershipResults.map(row => row.user.toString()); if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(existingMemberIDs); } const parentMemberIDs = parentMembershipResults.map(row => row.user.toString(), ); if (parentThreadID && parentThreadID !== genesis.id) { relationshipChangeset.setAllRelationshipsExist(parentMemberIDs); } const membershipRows = []; const toUpdateDescendants = new Map(); for (const [userID, membership] of membershipInfo) { const { rolePermissions: intendedRolePermissions, permissionsFromParent, } = membership; const oldPermissions = membership?.permissions ?? null; const oldPermissionsForChildren = membership?.permissionsForChildren ?? null; const existingMembership = membership.role !== undefined; const oldRole = membership.role ?? '-1'; const memberOfContainingThread = hasContainingThreadID ? !!membership.memberOfContainingThread : true; const targetRole = memberOfContainingThread ? oldRole : '-1'; const rolePermissions = memberOfContainingThread ? intendedRolePermissions : null; const permissions = makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ); const permissionsForChildren = makePermissionsForChildrenBlob(permissions); const newRole = getRoleForPermissions(targetRole, permissions); const userLostMembership = Number(oldRole) > 0 && Number(newRole) <= 0; if (_isEqual(permissions)(oldPermissions) && oldRole === newRole) { // This thread and all of its descendants need no updates for this user, // since the corresponding memberships row is unchanged by this operation continue; } if (permissions) { membershipRows.push({ operation: 'save', intent: 'none', userID, threadID, userNeedsFullThreadDetails: false, permissions, permissionsForChildren, role: newRole, oldRole, }); } else { membershipRows.push({ operation: 'delete', intent: 'none', userID, threadID, oldRole, }); } if (permissions && !existingMembership && threadID !== genesis.id) { // If there was no membership row before, and we are creating one, // we'll need to make sure the new member has a relationship row with // each existing member. We handle guaranteeing that new members have // relationship rows with each other in saveMemberships. relationshipChangeset.setRelationshipsNeeded(userID, existingMemberIDs); } if ( userLostMembership || !_isEqual(permissionsForChildren)(oldPermissionsForChildren) ) { toUpdateDescendants.set(userID, { userIsMember: Number(newRole) > 0, permissionsForChildren, }); } } if (toUpdateDescendants.size > 0) { const { membershipRows: descendantMembershipRows, relationshipChangeset: descendantRelationshipChangeset, } = await updateDescendantPermissions({ threadID, depth, changesByUser: toUpdateDescendants, }); pushAll(membershipRows, descendantMembershipRows); relationshipChangeset.addAll(descendantRelationshipChangeset); } return { membershipRows, relationshipChangeset }; } const defaultSubscriptionString = JSON.stringify({ home: false, pushNotifs: false, }); const joinSubscriptionString = JSON.stringify({ home: true, pushNotifs: true }); const membershipInsertBatchSize = 50; async function saveMemberships(toSave: $ReadOnlyArray) { if (toSave.length === 0) { return; } const time = Date.now(); const insertRows = []; for (const rowToSave of toSave) { insertRows.push([ rowToSave.userID, rowToSave.threadID, rowToSave.role, time, rowToSave.intent === 'join' ? joinSubscriptionString : defaultSubscriptionString, rowToSave.permissions ? JSON.stringify(rowToSave.permissions) : null, rowToSave.permissionsForChildren ? JSON.stringify(rowToSave.permissionsForChildren) : null, rowToSave.unread ? 1 : 0, 0, ]); } // Logic below will only update an existing membership row's `subscription` // column if the user is either joining or leaving the thread. That means // there's no way to use this function to update a user's subscription without // also making them join or leave the thread. The reason we do this is because // we need to specify a value for `subscription` here, as it's a non-null // column and this is an INSERT, but we don't want to require people to have // to know the current `subscription` when they're just using this function to // update the permissions of an existing membership row. + const dbType = await getDBType(); while (insertRows.length > 0) { const batch = insertRows.splice(0, membershipInsertBatchSize); const query = SQL` INSERT INTO memberships (user, thread, role, creation_time, subscription, permissions, permissions_for_children, last_message, last_read_message) VALUES ${batch} + `; + if (dbType === 'mysql5.7') { + query.append(SQL` ON DUPLICATE KEY UPDATE subscription = IF( (role <= 0 AND VALUES(role) > 0) OR (role > 0 AND VALUES(role) <= 0), VALUES(subscription), subscription ), role = VALUES(role), permissions = VALUES(permissions), permissions_for_children = VALUES(permissions_for_children) - `; + `); + } else { + query.append(SQL` + ON DUPLICATE KEY UPDATE + subscription = IF( + (role <= 0 AND VALUE(role) > 0) + OR (role > 0 AND VALUE(role) <= 0), + VALUE(subscription), + subscription + ), + role = VALUE(role), + permissions = VALUE(permissions), + permissions_for_children = VALUE(permissions_for_children) + `); + } await dbQuery(query); } } async function deleteMemberships( toDelete: $ReadOnlyArray, ) { if (toDelete.length === 0) { return; } const time = Date.now(); const insertRows = toDelete.map(rowToDelete => [ rowToDelete.userID, rowToDelete.threadID, -1, time, defaultSubscriptionString, null, null, 0, 0, ]); while (insertRows.length > 0) { const batch = insertRows.splice(0, membershipInsertBatchSize); const query = SQL` INSERT INTO memberships (user, thread, role, creation_time, subscription, permissions, permissions_for_children, last_message, last_read_message) VALUES ${batch} ON DUPLICATE KEY UPDATE role = -1, permissions = NULL, permissions_for_children = NULL, subscription = ${defaultSubscriptionString}, last_message = 0, last_read_message = 0 `; await dbQuery(query); } } // Specify non-empty changedThreadIDs to force updates to be generated for those // threads, presumably for reasons not covered in the changeset. calendarQuery // only needs to be specified if a JOIN_THREAD update will be generated for the // viewer, in which case it's necessary for knowing the set of entries to fetch. type ChangesetCommitResult = { ...FetchThreadInfosResult, ...CreateUpdatesResult, }; async function commitMembershipChangeset( viewer: Viewer, changeset: Changeset, { changedThreadIDs = new Set(), calendarQuery, updatesForCurrentSession = 'return', }: { +changedThreadIDs?: Set, +calendarQuery?: ?CalendarQuery, +updatesForCurrentSession?: UpdatesForCurrentSession, } = {}, ): Promise { if (!viewer.loggedIn) { throw new ServerError('not_logged_in'); } const { membershipRows, relationshipChangeset } = changeset; const membershipRowMap = new Map(); for (const row of membershipRows) { const { userID, threadID } = row; changedThreadIDs.add(threadID); const pairString = `${userID}|${threadID}`; const existing = membershipRowMap.get(pairString); invariant( !existing || existing.intent === 'none' || row.intent === 'none', `multiple intents provided for ${pairString}`, ); if (!existing || existing.intent === 'none') { membershipRowMap.set(pairString, row); } } const toSave = [], toDelete = [], toRescindPushNotifs = []; for (const row of membershipRowMap.values()) { if ( row.operation === 'delete' || (row.operation === 'save' && Number(row.role) <= 0) ) { const { userID, threadID } = row; toRescindPushNotifs.push({ userID, threadID }); } if (row.operation === 'delete') { toDelete.push(row); } else { toSave.push(row); } } const threadsToSavedUsers = new Map(); for (const row of membershipRowMap.values()) { const { userID, threadID } = row; let savedUsers = threadsToSavedUsers.get(threadID); if (!savedUsers) { savedUsers = []; threadsToSavedUsers.set(threadID, savedUsers); } savedUsers.push(userID); } for (const [threadID, savedUsers] of threadsToSavedUsers) { if (threadID !== genesis.id) { relationshipChangeset.setAllRelationshipsNeeded(savedUsers); } } const relationshipRows = relationshipChangeset.getRows(); const [updateDatas] = await Promise.all([ updateChangedUndirectedRelationships(relationshipRows), saveMemberships(toSave), deleteMemberships(toDelete), rescindPushNotifsForMemberDeletion(toRescindPushNotifs), ]); // We fetch all threads here because old clients still expect the full list of // threads on most thread operations. Once verifyClientSupported gates on // codeVersion 62, we can add a WHERE clause on changedThreadIDs here const serverThreadInfoFetchResult = await fetchServerThreadInfos(); const { threadInfos: serverThreadInfos } = serverThreadInfoFetchResult; const time = Date.now(); for (const changedThreadID of changedThreadIDs) { const serverThreadInfo = serverThreadInfos[changedThreadID]; for (const memberInfo of serverThreadInfo.members) { const pairString = `${memberInfo.id}|${serverThreadInfo.id}`; const membershipRow = membershipRowMap.get(pairString); if (membershipRow) { continue; } updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID: memberInfo.id, time, threadID: changedThreadID, }); } } for (const row of membershipRowMap.values()) { const { userID, threadID } = row; if (row.operation === 'delete' || row.role === '-1') { if (row.oldRole !== '-1') { updateDatas.push({ type: updateTypes.DELETE_THREAD, userID, time, threadID, }); } } else if (row.userNeedsFullThreadDetails) { updateDatas.push({ type: updateTypes.JOIN_THREAD, userID, time, threadID, }); } else { updateDatas.push({ type: updateTypes.UPDATE_THREAD, userID, time, threadID, }); } } const threadInfoFetchResult = rawThreadInfosFromServerThreadInfos( viewer, serverThreadInfoFetchResult, ); const { viewerUpdates, userInfos } = await createUpdates(updateDatas, { viewer, calendarQuery, ...threadInfoFetchResult, updatesForCurrentSession, }); return { ...threadInfoFetchResult, userInfos, viewerUpdates, }; } // When the user tries to create a new thread, it's possible for the client to // fail the creation even if a row gets added to the threads table. This may // occur due to a timeout (on either the client or server side), or due to some // error in the server code following the INSERT operation. Handling the error // scenario is more challenging since it would require detecting which set of // operations failed so we could retry them. As a result, this code is geared at // only handling the timeout scenario. async function getChangesetCommitResultForExistingThread( viewer: Viewer, threadID: string, otherUpdates: $ReadOnlyArray, { calendarQuery, updatesForCurrentSession = 'return', }: { +calendarQuery?: ?CalendarQuery, +updatesForCurrentSession?: UpdatesForCurrentSession, } = {}, ): Promise { for (const update of otherUpdates) { if ( update.type === updateTypes.JOIN_THREAD && update.threadInfo.id === threadID ) { // If the JOIN_THREAD is already there we can expect // the appropriate UPDATE_USERs to be covered as well return { viewerUpdates: otherUpdates, userInfos: {} }; } } const time = Date.now(); const updateDatas = [ { type: updateTypes.JOIN_THREAD, userID: viewer.userID, time, threadID, targetSession: viewer.session, }, ]; // To figure out what UserInfos might be missing, we consider the worst case: // the same client previously attempted to create a thread with a non-friend // they found via search results, but the request timed out. In this scenario // the viewer might never have received the UPDATE_USER that would add that // UserInfo to their UserStore, but the server assumed the client had gotten // it because createUpdates was called with UpdatesForCurrentSession=return. // For completeness here we query for the full list of memberships rows in the // thread. We can't use fetchServerThreadInfos because it skips role=-1 rows const membershipsQuery = SQL` SELECT user FROM memberships WHERE thread = ${threadID} AND user != ${viewer.userID} `; const [results] = await dbQuery(membershipsQuery); for (const row of results) { updateDatas.push({ type: updateTypes.UPDATE_USER, userID: viewer.userID, time, updatedUserID: row.user.toString(), targetSession: viewer.session, }); } const { viewerUpdates, userInfos } = await createUpdates(updateDatas, { viewer, calendarQuery, updatesForCurrentSession, }); return { viewerUpdates: [...otherUpdates, ...viewerUpdates], userInfos }; } const rescindPushNotifsBatchSize = 3; async function rescindPushNotifsForMemberDeletion( toRescindPushNotifs: $ReadOnlyArray<{ +userID: string, +threadID: string }>, ): Promise { const queue = [...toRescindPushNotifs]; while (queue.length > 0) { const batch = queue.splice(0, rescindPushNotifsBatchSize); await Promise.all( batch.map(({ userID, threadID }) => rescindPushNotifs( SQL`n.thread = ${threadID} AND n.user = ${userID}`, SQL`IF(m.thread = ${threadID}, NULL, m.thread)`, ), ), ); } } async function recalculateAllThreadPermissions() { const getAllThreads = SQL`SELECT id FROM threads`; const [result] = await dbQuery(getAllThreads); // We handle each thread one-by-one to avoid a situation where a permission // calculation for a child thread, done during a call to // recalculateThreadPermissions for the parent thread, can be incorrectly // overriden by a call to recalculateThreadPermissions for the child thread. // If the changeset resulting from the parent call isn't committed before the // calculation is done for the child, the calculation done for the child can // be incorrect. const viewer = createScriptViewer(bots.commbot.userID); for (const row of result) { const threadID = row.id.toString(); const changeset = await recalculateThreadPermissions(threadID); await commitMembershipChangeset(viewer, changeset); } } async function updateRolesAndPermissionsForAllThreads() { const batchSize = 10; const fetchThreads = SQL`SELECT id, type, depth FROM threads`; const [result] = await dbQuery(fetchThreads); const allThreads = result.map(row => { return { id: row.id.toString(), type: assertThreadType(row.type), depth: row.depth, }; }); const viewer = createScriptViewer(bots.commbot.userID); const maxDepth = Math.max(...allThreads.map(row => row.depth)); for (let depth = 0; depth <= maxDepth; depth++) { const threads = allThreads.filter(row => row.depth === depth); console.log(`recalculating permissions for threads with depth ${depth}`); while (threads.length > 0) { const batch = threads.splice(0, batchSize); const membershipRows = []; const relationshipChangeset = new RelationshipChangeset(); await Promise.all( batch.map(async thread => { console.log(`updating roles for ${thread.id}`); await updateRoles(viewer, thread.id, thread.type); console.log(`recalculating permissions for ${thread.id}`); const { membershipRows: threadMembershipRows, relationshipChangeset: threadRelationshipChangeset, } = await recalculateThreadPermissions(thread.id); membershipRows.push(...threadMembershipRows); relationshipChangeset.addAll(threadRelationshipChangeset); }), ); console.log(`committing batch ${JSON.stringify(batch)}`); await commitMembershipChangeset(viewer, { membershipRows, relationshipChangeset, }); } } } export { changeRole, recalculateThreadPermissions, getChangesetCommitResultForExistingThread, saveMemberships, commitMembershipChangeset, recalculateAllThreadPermissions, updateRolesAndPermissionsForAllThreads, };