diff --git a/lib/shared/messages/add-members-message-spec.js b/lib/shared/messages/add-members-message-spec.js new file mode 100644 index 000000000..5b9fab377 --- /dev/null +++ b/lib/shared/messages/add-members-message-spec.js @@ -0,0 +1,12 @@ +// @flow + +import type { AddMembersMessageData } from '../../types/message/add-members'; +import type { MessageSpec } from './message-spec'; + +export const addMembersMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify(data.addedUserIDs); + }, + }, +); diff --git a/lib/shared/messages/change-role-message-spec.js b/lib/shared/messages/change-role-message-spec.js new file mode 100644 index 000000000..c501b7bb3 --- /dev/null +++ b/lib/shared/messages/change-role-message-spec.js @@ -0,0 +1,15 @@ +// @flow + +import type { ChangeRoleMessageData } from '../../types/message/change-role'; +import type { MessageSpec } from './message-spec'; + +export const changeRoleMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + userIDs: data.userIDs, + newRole: data.newRole, + }); + }, + }, +); diff --git a/lib/shared/messages/change-settings-message-spec.js b/lib/shared/messages/change-settings-message-spec.js new file mode 100644 index 000000000..f224b7c95 --- /dev/null +++ b/lib/shared/messages/change-settings-message-spec.js @@ -0,0 +1,14 @@ +// @flow + +import type { ChangeSettingsMessageData } from '../../types/message/change-settings'; +import type { MessageSpec } from './message-spec'; + +export const changeSettingsMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + [data.field]: data.value, + }); + }, + }, +); diff --git a/lib/shared/messages/create-entry-message-spec.js b/lib/shared/messages/create-entry-message-spec.js new file mode 100644 index 000000000..dbcc75db3 --- /dev/null +++ b/lib/shared/messages/create-entry-message-spec.js @@ -0,0 +1,16 @@ +// @flow + +import type { CreateEntryMessageData } from '../../types/message/create-entry'; +import type { MessageSpec } from './message-spec'; + +export const createEntryMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + entryID: data.entryID, + date: data.date, + text: data.text, + }); + }, + }, +); diff --git a/lib/shared/messages/create-sidebar-message-spec.js b/lib/shared/messages/create-sidebar-message-spec.js new file mode 100644 index 000000000..11038a05a --- /dev/null +++ b/lib/shared/messages/create-sidebar-message-spec.js @@ -0,0 +1,15 @@ +// @flow + +import type { CreateSidebarMessageData } from '../../types/message/create-sidebar'; +import type { MessageSpec } from './message-spec'; + +export const createSidebarMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + ...data.initialThreadState, + sourceMessageAuthorID: data.sourceMessageAuthorID, + }); + }, + }, +); diff --git a/lib/shared/messages/create-sub-thread-message-spec.js b/lib/shared/messages/create-sub-thread-message-spec.js new file mode 100644 index 000000000..d4e69f684 --- /dev/null +++ b/lib/shared/messages/create-sub-thread-message-spec.js @@ -0,0 +1,12 @@ +// @flow + +import type { CreateSubthreadMessageData } from '../../types/message/create-subthread'; +import type { MessageSpec } from './message-spec'; + +export const createSubThreadMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return data.childThreadID; + }, + }, +); diff --git a/lib/shared/messages/create-thread-message-spec.js b/lib/shared/messages/create-thread-message-spec.js new file mode 100644 index 000000000..ed40d2858 --- /dev/null +++ b/lib/shared/messages/create-thread-message-spec.js @@ -0,0 +1,12 @@ +// @flow + +import type { CreateThreadMessageData } from '../../types/message/create-thread'; +import type { MessageSpec } from './message-spec'; + +export const createThreadMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify(data.initialThreadState); + }, + }, +); diff --git a/lib/shared/messages/delete-entry-message-spec.js b/lib/shared/messages/delete-entry-message-spec.js new file mode 100644 index 000000000..4ae8920f1 --- /dev/null +++ b/lib/shared/messages/delete-entry-message-spec.js @@ -0,0 +1,16 @@ +// @flow + +import type { DeleteEntryMessageData } from '../../types/message/delete-entry'; +import type { MessageSpec } from './message-spec'; + +export const deleteEntryMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + entryID: data.entryID, + date: data.date, + text: data.text, + }); + }, + }, +); diff --git a/lib/shared/messages/edit-entry-message-spec.js b/lib/shared/messages/edit-entry-message-spec.js new file mode 100644 index 000000000..10a27ef2a --- /dev/null +++ b/lib/shared/messages/edit-entry-message-spec.js @@ -0,0 +1,16 @@ +// @flow + +import type { EditEntryMessageData } from '../../types/message/edit-entry'; +import type { MessageSpec } from './message-spec'; + +export const editEntryMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + entryID: data.entryID, + date: data.date, + text: data.text, + }); + }, + }, +); diff --git a/lib/shared/messages/images-message-spec.js b/lib/shared/messages/images-message-spec.js new file mode 100644 index 000000000..1cfe63bc4 --- /dev/null +++ b/lib/shared/messages/images-message-spec.js @@ -0,0 +1,11 @@ +// @flow + +import type { ImagesMessageData } from '../../types/message/images'; +import type { MessageSpec } from './message-spec'; + +export const imagesMessageSpec: MessageSpec = Object.freeze({ + messageContent(data) { + const mediaIDs = data.media.map((media) => parseInt(media.id, 10)); + return JSON.stringify(mediaIDs); + }, +}); diff --git a/lib/shared/messages/join-thread-message-spec.js b/lib/shared/messages/join-thread-message-spec.js new file mode 100644 index 000000000..004c61055 --- /dev/null +++ b/lib/shared/messages/join-thread-message-spec.js @@ -0,0 +1,8 @@ +// @flow + +import type { JoinThreadMessageData } from '../../types/message/join-thread'; +import type { MessageSpec } from './message-spec'; + +export const joinThreadMessageSpec: MessageSpec = Object.freeze( + {}, +); diff --git a/lib/shared/messages/leave-thread-message-spec.js b/lib/shared/messages/leave-thread-message-spec.js new file mode 100644 index 000000000..e38480311 --- /dev/null +++ b/lib/shared/messages/leave-thread-message-spec.js @@ -0,0 +1,8 @@ +// @flow + +import type { LeaveThreadMessageData } from '../../types/message/leave-thread'; +import type { MessageSpec } from './message-spec'; + +export const leaveThreadMessageSpec: MessageSpec = Object.freeze( + {}, +); diff --git a/lib/shared/messages/message-spec.js b/lib/shared/messages/message-spec.js new file mode 100644 index 000000000..18685e04e --- /dev/null +++ b/lib/shared/messages/message-spec.js @@ -0,0 +1,5 @@ +// @flow + +export type MessageSpec = {| + +messageContent?: (data: Data) => string | null, +|}; diff --git a/lib/shared/messages/message-specs.js b/lib/shared/messages/message-specs.js new file mode 100644 index 000000000..1687aea51 --- /dev/null +++ b/lib/shared/messages/message-specs.js @@ -0,0 +1,44 @@ +// @flow + +import { messageTypes } from '../../types/message-types'; +import { addMembersMessageSpec } from './add-members-message-spec'; +import { changeRoleMessageSpec } from './change-role-message-spec'; +import { changeSettingsMessageSpec } from './change-settings-message-spec'; +import { createEntryMessageSpec } from './create-entry-message-spec'; +import { createSidebarMessageSpec } from './create-sidebar-message-spec'; +import { createSubThreadMessageSpec } from './create-sub-thread-message-spec'; +import { createThreadMessageSpec } from './create-thread-message-spec'; +import { deleteEntryMessageSpec } from './delete-entry-message-spec'; +import { editEntryMessageSpec } from './edit-entry-message-spec'; +import { imagesMessageSpec } from './images-message-spec'; +import { joinThreadMessageSpec } from './join-thread-message-spec'; +import { leaveThreadMessageSpec } from './leave-thread-message-spec'; +import { multimediaMessageSpec } from './multimedia-message-spec'; +import { removeMembersMessageSpec } from './remove-members-message-spec'; +import { restoreEntryMessageSpec } from './restore-entry-message-spec'; +import { sidebarSourceMessageSpec } from './sidebar-source-message-spec'; +import { textMessageSpec } from './text-message-spec'; +import { unsupportedMessageSpec } from './unsupported-message-spec'; +import { updateRelationshipMessageSpec } from './update-relationship-message-spec'; + +export const messageSpecs = Object.freeze({ + [messageTypes.TEXT]: textMessageSpec, + [messageTypes.CREATE_THREAD]: createThreadMessageSpec, + [messageTypes.ADD_MEMBERS]: addMembersMessageSpec, + [messageTypes.CREATE_SUB_THREAD]: createSubThreadMessageSpec, + [messageTypes.CHANGE_SETTINGS]: changeSettingsMessageSpec, + [messageTypes.REMOVE_MEMBERS]: removeMembersMessageSpec, + [messageTypes.CHANGE_ROLE]: changeRoleMessageSpec, + [messageTypes.LEAVE_THREAD]: leaveThreadMessageSpec, + [messageTypes.JOIN_THREAD]: joinThreadMessageSpec, + [messageTypes.CREATE_ENTRY]: createEntryMessageSpec, + [messageTypes.EDIT_ENTRY]: editEntryMessageSpec, + [messageTypes.DELETE_ENTRY]: deleteEntryMessageSpec, + [messageTypes.RESTORE_ENTRY]: restoreEntryMessageSpec, + [messageTypes.UNSUPPORTED]: unsupportedMessageSpec, + [messageTypes.IMAGES]: imagesMessageSpec, + [messageTypes.MULTIMEDIA]: multimediaMessageSpec, + [messageTypes.UPDATE_RELATIONSHIP]: updateRelationshipMessageSpec, + [messageTypes.SIDEBAR_SOURCE]: sidebarSourceMessageSpec, + [messageTypes.CREATE_SIDEBAR]: createSidebarMessageSpec, +}); diff --git a/lib/shared/messages/multimedia-message-spec.js b/lib/shared/messages/multimedia-message-spec.js new file mode 100644 index 000000000..42dba6621 --- /dev/null +++ b/lib/shared/messages/multimedia-message-spec.js @@ -0,0 +1,13 @@ +// @flow + +import type { MediaMessageData } from '../../types/message/media'; +import type { MessageSpec } from './message-spec'; + +export const multimediaMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + const mediaIDs = data.media.map((media) => parseInt(media.id, 10)); + return JSON.stringify(mediaIDs); + }, + }, +); diff --git a/lib/shared/messages/remove-members-message-spec.js b/lib/shared/messages/remove-members-message-spec.js new file mode 100644 index 000000000..27ded6847 --- /dev/null +++ b/lib/shared/messages/remove-members-message-spec.js @@ -0,0 +1,12 @@ +// @flow + +import type { RemoveMembersMessageData } from '../../types/message/remove-members'; +import type { MessageSpec } from './message-spec'; + +export const removeMembersMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify(data.removedUserIDs); + }, + }, +); diff --git a/lib/shared/messages/restore-entry-message-spec.js b/lib/shared/messages/restore-entry-message-spec.js new file mode 100644 index 000000000..f9bb17b98 --- /dev/null +++ b/lib/shared/messages/restore-entry-message-spec.js @@ -0,0 +1,16 @@ +// @flow + +import type { RestoreEntryMessageData } from '../../types/message/restore-entry'; +import type { MessageSpec } from './message-spec'; + +export const restoreEntryMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + entryID: data.entryID, + date: data.date, + text: data.text, + }); + }, + }, +); diff --git a/lib/shared/messages/sidebar-source-message-spec.js b/lib/shared/messages/sidebar-source-message-spec.js new file mode 100644 index 000000000..7b12cec2c --- /dev/null +++ b/lib/shared/messages/sidebar-source-message-spec.js @@ -0,0 +1,14 @@ +// @flow + +import type { SidebarSourceMessageData } from '../../types/message-types'; +import type { MessageSpec } from './message-spec'; + +export const sidebarSourceMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + sourceMessageID: data.sourceMessage.id, + }); + }, + }, +); diff --git a/lib/shared/messages/text-message-spec.js b/lib/shared/messages/text-message-spec.js new file mode 100644 index 000000000..685d28f1a --- /dev/null +++ b/lib/shared/messages/text-message-spec.js @@ -0,0 +1,10 @@ +// @flow + +import type { TextMessageData } from '../../types/message/text'; +import type { MessageSpec } from './message-spec'; + +export const textMessageSpec: MessageSpec = Object.freeze({ + messageContent(data) { + return data.text; + }, +}); diff --git a/lib/shared/messages/unsupported-message-spec.js b/lib/shared/messages/unsupported-message-spec.js new file mode 100644 index 000000000..0a004dbda --- /dev/null +++ b/lib/shared/messages/unsupported-message-spec.js @@ -0,0 +1,5 @@ +// @flow + +import type { MessageSpec } from './message-spec'; + +export const unsupportedMessageSpec: MessageSpec = Object.freeze({}); diff --git a/lib/shared/messages/update-relationship-message-spec.js b/lib/shared/messages/update-relationship-message-spec.js new file mode 100644 index 000000000..3bcc9d745 --- /dev/null +++ b/lib/shared/messages/update-relationship-message-spec.js @@ -0,0 +1,15 @@ +// @flow + +import type { UpdateRelationshipMessageData } from '../../types/message/update-relationship'; +import type { MessageSpec } from './message-spec'; + +export const updateRelationshipMessageSpec: MessageSpec = Object.freeze( + { + messageContent(data) { + return JSON.stringify({ + operation: data.operation, + targetID: data.targetID, + }); + }, + }, +); diff --git a/server/src/creators/message-creator.js b/server/src/creators/message-creator.js index 9fdf96043..01773631b 100644 --- a/server/src/creators/message-creator.js +++ b/server/src/creators/message-creator.js @@ -1,576 +1,525 @@ // @flow import invariant from 'invariant'; import { permissionLookup } from 'lib/permissions/thread-permissions'; import { rawMessageInfoFromMessageData, messageTypeGeneratesNotifs, shimUnsupportedRawMessageInfos, stripLocalIDs, } from 'lib/shared/message-utils'; +import { messageSpecs } from 'lib/shared/messages/message-specs'; import { messageTypes, messageDataLocalID, type MessageData, type RawMessageInfo, } from 'lib/types/message-types'; import { redisMessageTypes } from 'lib/types/redis-types'; import { threadPermissions } from 'lib/types/thread-types'; import { updateTypes } from 'lib/types/update-types'; import { dbQuery, SQL, appendSQLArray, mergeOrConditions, } from '../database/database'; import { fetchMessageInfoForLocalID } from '../fetchers/message-fetchers'; import { fetchOtherSessionsForViewer } from '../fetchers/session-fetchers'; import { sendPushNotifs } from '../push/send'; import { handleAsyncPromise } from '../responders/handlers'; import type { Viewer } from '../session/viewer'; import { earliestFocusedTimeConsideredCurrent } from '../shared/focused-times'; import { publisher } from '../socket/redis'; import { creationString } from '../utils/idempotent'; import createIDs from './id-creator'; import type { UpdatesForCurrentSession } from './update-creator'; import { createUpdates } from './update-creator'; type UserThreadInfo = {| +devices: Map< string, {| +deviceType: string, +deviceToken: string, +codeVersion: ?string, |}, >, +threadIDs: Set, +notFocusedThreadIDs: Set, +subthreadsCanNotify: Set, +subthreadsCanSetToUnread: Set, |}; type LatestMessagesPerUser = Map< string, $ReadOnlyMap< string, {| +latestMessage: string, +latestReadMessage?: string, |}, >, >; type LatestMessages = $ReadOnlyArray<{| +userID: string, +threadID: string, +latestMessage: string, +latestReadMessage: ?string, |}>; // Does not do permission checks! (checkThreadPermission) async function createMessages( viewer: Viewer, messageDatas: $ReadOnlyArray, updatesForCurrentSession?: UpdatesForCurrentSession = 'return', ): Promise { if (messageDatas.length === 0) { return []; } const messageInfos: RawMessageInfo[] = []; const newMessageDatas: MessageData[] = []; const existingMessages = await Promise.all( messageDatas.map((messageData) => fetchMessageInfoForLocalID(viewer, messageDataLocalID(messageData)), ), ); for (let i = 0; i < existingMessages.length; i++) { const existingMessage = existingMessages[i]; if (existingMessage) { messageInfos.push(existingMessage); } else { newMessageDatas.push(messageDatas[i]); } } if (newMessageDatas.length === 0) { return shimUnsupportedRawMessageInfos(messageInfos, viewer.platformDetails); } const ids = await createIDs('messages', newMessageDatas.length); const subthreadPermissionsToCheck: Set = new Set(); const threadsToMessageIndices: Map = new Map(); const messageInsertRows = []; for (let i = 0; i < newMessageDatas.length; i++) { const messageData = newMessageDatas[i]; const threadID = messageData.threadID; const creatorID = messageData.creatorID; if (messageData.type === messageTypes.CREATE_SUB_THREAD) { subthreadPermissionsToCheck.add(messageData.childThreadID); } let messageIndices = threadsToMessageIndices.get(threadID); if (!messageIndices) { messageIndices = []; threadsToMessageIndices.set(threadID, messageIndices); } messageIndices.push(i); - let content; - if (messageData.type === messageTypes.CREATE_THREAD) { - content = JSON.stringify(messageData.initialThreadState); - } else if (messageData.type === messageTypes.CREATE_SUB_THREAD) { - content = messageData.childThreadID; - } else if (messageData.type === messageTypes.TEXT) { - content = messageData.text; - } else if (messageData.type === messageTypes.ADD_MEMBERS) { - content = JSON.stringify(messageData.addedUserIDs); - } else if (messageData.type === messageTypes.CHANGE_SETTINGS) { - content = JSON.stringify({ - [messageData.field]: messageData.value, - }); - } else if (messageData.type === messageTypes.REMOVE_MEMBERS) { - content = JSON.stringify(messageData.removedUserIDs); - } else if (messageData.type === messageTypes.CHANGE_ROLE) { - content = JSON.stringify({ - userIDs: messageData.userIDs, - newRole: messageData.newRole, - }); - } else if ( - messageData.type === messageTypes.CREATE_ENTRY || - messageData.type === messageTypes.EDIT_ENTRY || - messageData.type === messageTypes.DELETE_ENTRY || - messageData.type === messageTypes.RESTORE_ENTRY - ) { - content = JSON.stringify({ - entryID: messageData.entryID, - date: messageData.date, - text: messageData.text, - }); - } else if ( - messageData.type === messageTypes.IMAGES || - messageData.type === messageTypes.MULTIMEDIA - ) { - const mediaIDs = []; - for (const { id } of messageData.media) { - mediaIDs.push(parseInt(id, 10)); - } - content = JSON.stringify(mediaIDs); - } else if (messageData.type === messageTypes.UPDATE_RELATIONSHIP) { - content = JSON.stringify({ - operation: messageData.operation, - targetID: messageData.targetID, - }); - } else if (messageData.type === messageTypes.SIDEBAR_SOURCE) { - content = JSON.stringify({ - sourceMessageID: messageData.sourceMessage.id, - }); - } else if (messageData.type === messageTypes.CREATE_SIDEBAR) { - content = JSON.stringify({ - ...messageData.initialThreadState, - sourceMessageAuthorID: messageData.sourceMessageAuthorID, - }); - } + const content = messageSpecs[messageData.type].messageContent?.( + messageData, + ); const creation = messageData.localID && viewer.hasSessionInfo ? creationString(viewer, messageData.localID) : null; messageInsertRows.push([ ids[i], threadID, creatorID, messageData.type, content, messageData.time, creation, ]); messageInfos.push(rawMessageInfoFromMessageData(messageData, ids[i])); } if (viewer.isScriptViewer) { await postMessageSend( viewer, threadsToMessageIndices, subthreadPermissionsToCheck, stripLocalIDs(messageInfos), updatesForCurrentSession, ); } else { // We aren't awaiting because this function calls external services and we // don't want to delay the response handleAsyncPromise( postMessageSend( viewer, threadsToMessageIndices, subthreadPermissionsToCheck, stripLocalIDs(messageInfos), updatesForCurrentSession, ), ); } const messageInsertQuery = SQL` INSERT INTO messages(id, thread, user, type, content, time, creation) VALUES ${messageInsertRows} `; await dbQuery(messageInsertQuery); if (updatesForCurrentSession !== 'return') { return []; } return shimUnsupportedRawMessageInfos(messageInfos, viewer.platformDetails); } // Handles: // (1) Sending push notifs // (2) Setting threads to unread and generating corresponding UpdateInfos // (3) Publishing to Redis so that active sockets pass on new messages async function postMessageSend( viewer: Viewer, threadsToMessageIndices: Map, subthreadPermissionsToCheck: Set, messageInfos: RawMessageInfo[], updatesForCurrentSession: UpdatesForCurrentSession, ) { let joinIndex = 0; let subthreadSelects = ''; const subthreadJoins = []; for (const subthread of subthreadPermissionsToCheck) { const index = joinIndex++; subthreadSelects += ` , stm${index}.permissions AS subthread${subthread}_permissions, stm${index}.role AS subthread${subthread}_role `; const join = SQL`LEFT JOIN memberships `; join.append(`stm${index} ON stm${index}.`); join.append(SQL`thread = ${subthread} AND `); join.append(`stm${index}.user = m.user`); subthreadJoins.push(join); } const time = earliestFocusedTimeConsideredCurrent(); const visibleExtractString = `$.${threadPermissions.VISIBLE}.value`; const query = SQL` SELECT m.user, m.thread, c.platform, c.device_token, c.versions, f.user AS focused_user `; query.append(subthreadSelects); query.append(SQL` FROM memberships m LEFT JOIN cookies c ON c.user = m.user AND c.device_token IS NOT NULL LEFT JOIN focused f ON f.user = m.user AND f.thread = m.thread AND f.time > ${time} `); appendSQLArray(query, subthreadJoins, SQL` `); query.append(SQL` WHERE (m.role > 0 OR f.user IS NOT NULL) AND JSON_EXTRACT(m.permissions, ${visibleExtractString}) IS TRUE AND m.thread IN (${[...threadsToMessageIndices.keys()]}) `); const perUserInfo = new Map(); const [result] = await dbQuery(query); for (const row of result) { const userID = row.user.toString(); const threadID = row.thread.toString(); const deviceToken = row.device_token; const focusedUser = !!row.focused_user; const { platform, versions } = row; let thisUserInfo = perUserInfo.get(userID); if (!thisUserInfo) { thisUserInfo = { devices: new Map(), threadIDs: new Set(), notFocusedThreadIDs: new Set(), subthreadsCanNotify: new Set(), subthreadsCanSetToUnread: new Set(), }; perUserInfo.set(userID, thisUserInfo); // Subthread info will be the same for each subthread, so we only parse // it once for (const subthread of subthreadPermissionsToCheck) { const isSubthreadMember = row[`subthread${subthread}_role`] > 0; const permissions = row[`subthread${subthread}_permissions`]; const canSeeSubthread = permissionLookup( permissions, threadPermissions.KNOW_OF, ); if (!canSeeSubthread) { continue; } thisUserInfo.subthreadsCanSetToUnread.add(subthread); // Only include the notification from the superthread if there is no // notification from the subthread if ( !isSubthreadMember || !permissionLookup(permissions, threadPermissions.VISIBLE) ) { thisUserInfo.subthreadsCanNotify.add(subthread); } } } if (deviceToken) { thisUserInfo.devices.set(deviceToken, { deviceType: platform, deviceToken, codeVersion: versions ? versions.codeVersion : null, }); } thisUserInfo.threadIDs.add(threadID); if (!focusedUser) { thisUserInfo.notFocusedThreadIDs.add(threadID); } } const pushInfo = {}; const messageInfosPerUser = {}; const latestMessagesPerUser: LatestMessagesPerUser = new Map(); for (const pair of perUserInfo) { const [userID, preUserPushInfo] = pair; const { subthreadsCanNotify } = preUserPushInfo; const userPushInfo = { devices: [...preUserPushInfo.devices.values()], messageInfos: [], }; for (const threadID of preUserPushInfo.notFocusedThreadIDs) { const messageIndices = threadsToMessageIndices.get(threadID); invariant(messageIndices, `indices should exist for thread ${threadID}`); for (const messageIndex of messageIndices) { const messageInfo = messageInfos[messageIndex]; if ( (messageInfo.type !== messageTypes.CREATE_SUB_THREAD || subthreadsCanNotify.has(messageInfo.childThreadID)) && messageTypeGeneratesNotifs(messageInfo.type) && messageInfo.creatorID !== userID ) { userPushInfo.messageInfos.push(messageInfo); } } } if ( userPushInfo.devices.length > 0 && userPushInfo.messageInfos.length > 0 ) { pushInfo[userID] = userPushInfo; } const userMessageInfos = []; for (const threadID of preUserPushInfo.threadIDs) { const messageIndices = threadsToMessageIndices.get(threadID); invariant(messageIndices, `indices should exist for thread ${threadID}`); for (const messageIndex of messageIndices) { const messageInfo = messageInfos[messageIndex]; userMessageInfos.push(messageInfo); } } if (userMessageInfos.length > 0) { messageInfosPerUser[userID] = userMessageInfos; } latestMessagesPerUser.set( userID, determineLatestMessagesPerThread( preUserPushInfo, userID, threadsToMessageIndices, messageInfos, ), ); } const latestMessages = flattenLatestMessagesPerUser(latestMessagesPerUser); await Promise.all([ createReadStatusUpdates(latestMessages), redisPublish(viewer, messageInfosPerUser, updatesForCurrentSession), updateLatestMessages(latestMessages), ]); await sendPushNotifs(pushInfo); } async function redisPublish( viewer: Viewer, messageInfosPerUser: { [userID: string]: $ReadOnlyArray }, updatesForCurrentSession: UpdatesForCurrentSession, ) { const avoidBroadcastingToCurrentSession = viewer.hasSessionInfo && updatesForCurrentSession !== 'broadcast'; for (const userID in messageInfosPerUser) { if (userID === viewer.userID && avoidBroadcastingToCurrentSession) { continue; } const messageInfos = messageInfosPerUser[userID]; publisher.sendMessage( { userID }, { type: redisMessageTypes.NEW_MESSAGES, messages: messageInfos, }, ); } const viewerMessageInfos = messageInfosPerUser[viewer.userID]; if (!viewerMessageInfos || !avoidBroadcastingToCurrentSession) { return; } const sessionIDs = await fetchOtherSessionsForViewer(viewer); for (const sessionID of sessionIDs) { publisher.sendMessage( { userID: viewer.userID, sessionID }, { type: redisMessageTypes.NEW_MESSAGES, messages: viewerMessageInfos, }, ); } } function determineLatestMessagesPerThread( preUserPushInfo: UserThreadInfo, userID: string, threadsToMessageIndices: $ReadOnlyMap>, messageInfos: $ReadOnlyArray, ) { const { threadIDs, notFocusedThreadIDs, subthreadsCanSetToUnread, } = preUserPushInfo; const latestMessagesPerThread = new Map(); for (const threadID of threadIDs) { const messageIndices = threadsToMessageIndices.get(threadID); invariant(messageIndices, `indices should exist for thread ${threadID}`); for (const messageIndex of messageIndices) { const messageInfo = messageInfos[messageIndex]; if ( messageInfo.type === messageTypes.CREATE_SUB_THREAD && !subthreadsCanSetToUnread.has(messageInfo.childThreadID) ) { continue; } const messageID = messageInfo.id; invariant( messageID, 'message ID should exist in determineLatestMessagesPerThread', ); if ( notFocusedThreadIDs.has(threadID) && messageInfo.creatorID !== userID ) { latestMessagesPerThread.set(threadID, { latestMessage: messageID, }); } else { latestMessagesPerThread.set(threadID, { latestMessage: messageID, latestReadMessage: messageID, }); } } } return latestMessagesPerThread; } function flattenLatestMessagesPerUser( latestMessagesPerUser: LatestMessagesPerUser, ): LatestMessages { const result = []; for (const [userID, latestMessagesPerThread] of latestMessagesPerUser) { for (const [threadID, latestMessages] of latestMessagesPerThread) { result.push({ userID, threadID, latestMessage: latestMessages.latestMessage, latestReadMessage: latestMessages.latestReadMessage, }); } } return result; } async function createReadStatusUpdates(latestMessages: LatestMessages) { const now = Date.now(); const readStatusUpdates = latestMessages .filter((message) => !message.latestReadMessage) .map(({ userID, threadID }) => ({ type: updateTypes.UPDATE_THREAD_READ_STATUS, userID, time: now, threadID, unread: true, })); if (readStatusUpdates.length === 0) { return; } return await createUpdates(readStatusUpdates); } function updateLatestMessages(latestMessages: LatestMessages) { if (latestMessages.length === 0) { return; } const query = SQL` UPDATE memberships SET `; const lastMessageExpression = SQL` last_message = GREATEST(last_message, CASE `; const lastReadMessageExpression = SQL` , last_read_message = GREATEST(last_read_message, CASE `; let shouldUpdateLastReadMessage = false; for (const { userID, threadID, latestMessage, latestReadMessage, } of latestMessages) { lastMessageExpression.append(SQL` WHEN user = ${userID} AND thread = ${threadID} THEN ${latestMessage} `); if (latestReadMessage) { shouldUpdateLastReadMessage = true; lastReadMessageExpression.append(SQL` WHEN user = ${userID} AND thread = ${threadID} THEN ${latestReadMessage} `); } } lastMessageExpression.append(SQL` ELSE last_message END) `); lastReadMessageExpression.append(SQL` ELSE last_read_message END) `); const conditions = latestMessages.map( ({ userID, threadID }) => SQL`(user = ${userID} AND thread = ${threadID})`, ); query.append(lastMessageExpression); if (shouldUpdateLastReadMessage) { query.append(lastReadMessageExpression); } query.append(SQL`WHERE `); query.append(mergeOrConditions(conditions)); return dbQuery(query); } export default createMessages;