diff --git a/keyserver/src/push/crypto.js b/keyserver/src/push/crypto.js index 012a32d6f..ebf99b7c1 100644 --- a/keyserver/src/push/crypto.js +++ b/keyserver/src/push/crypto.js @@ -1,417 +1,425 @@ // @flow +import type { EncryptResult } from '@commapp/olm'; import apn from '@parse/node-apn'; import crypto from 'crypto'; import invariant from 'invariant'; import _cloneDeep from 'lodash/fp/cloneDeep.js'; import type { PlainTextWebNotification, WebNotification, } from 'lib/types/notif-types.js'; import { toBase64URL } from 'lib/utils/base64.js'; import type { AndroidNotification, AndroidNotificationPayload, AndroidNotificationRescind, NotificationTargetDevice, } from './types.js'; import { encryptAndUpdateOlmSession } from '../updaters/olm-session-updater.js'; import { encrypt, generateKey } from '../utils/aes-crypto-utils.js'; import { getOlmUtility } from '../utils/olm-utils.js'; async function encryptIOSNotification( cookieID: string, notification: apn.Notification, codeVersion?: ?number, notificationSizeValidator?: apn.Notification => boolean, ): Promise<{ +notification: apn.Notification, +payloadSizeExceeded: boolean, +encryptedPayloadHash?: string, +encryptionOrder?: number, }> { invariant( !notification.collapseId, 'Collapsible notifications encryption currently not implemented', ); const encryptedNotification = new apn.Notification(); encryptedNotification.id = notification.id; encryptedNotification.payload.id = notification.id; encryptedNotification.topic = notification.topic; encryptedNotification.sound = notification.aps.sound; encryptedNotification.pushType = 'alert'; encryptedNotification.mutableContent = true; const { id, ...payloadSansId } = notification.payload; const unencryptedPayload = { ...payloadSansId, badge: notification.aps.badge.toString(), merged: notification.body, }; try { const unencryptedSerializedPayload = JSON.stringify(unencryptedPayload); let dbPersistCondition; if (notificationSizeValidator) { - dbPersistCondition = ({ serializedPayload }) => { + dbPersistCondition = ({ + serializedPayload, + }: { + +[string]: EncryptResult, + }) => { const notifCopy = _cloneDeep(encryptedNotification); notifCopy.payload.encryptedPayload = serializedPayload.body; return notificationSizeValidator(notifCopy); }; } const { encryptedMessages: { serializedPayload }, dbPersistConditionViolated, encryptionOrder, } = await encryptAndUpdateOlmSession( cookieID, 'notifications', { serializedPayload: unencryptedSerializedPayload, }, dbPersistCondition, ); encryptedNotification.payload.encryptedPayload = serializedPayload.body; if (codeVersion && codeVersion >= 254 && codeVersion % 2 === 0) { encryptedNotification.aps = { alert: { body: 'ENCRYPTED' }, ...encryptedNotification.aps, }; } const encryptedPayloadHash = getOlmUtility().sha256(serializedPayload.body); return { notification: encryptedNotification, payloadSizeExceeded: !!dbPersistConditionViolated, encryptedPayloadHash, encryptionOrder, }; } catch (e) { console.log('Notification encryption failed: ' + e); encryptedNotification.body = notification.body; encryptedNotification.threadId = notification.payload.threadID; invariant( typeof notification.aps.badge === 'number', 'Unencrypted notification must have badge as a number', ); encryptedNotification.badge = notification.aps.badge; encryptedNotification.payload = { ...encryptedNotification.payload, ...notification.payload, encryptionFailed: 1, }; return { notification: encryptedNotification, payloadSizeExceeded: notificationSizeValidator ? notificationSizeValidator(_cloneDeep(encryptedNotification)) : false, }; } } async function encryptAndroidNotificationPayload( cookieID: string, unencryptedPayload: T, payloadSizeValidator?: (T | { +encryptedPayload: string }) => boolean, ): Promise<{ +resultPayload: T | { +encryptedPayload: string }, +payloadSizeExceeded: boolean, +encryptionOrder?: number, }> { try { const unencryptedSerializedPayload = JSON.stringify(unencryptedPayload); if (!unencryptedSerializedPayload) { return { resultPayload: unencryptedPayload, payloadSizeExceeded: payloadSizeValidator ? payloadSizeValidator(unencryptedPayload) : false, }; } let dbPersistCondition; if (payloadSizeValidator) { - dbPersistCondition = ({ serializedPayload }) => - payloadSizeValidator({ encryptedPayload: serializedPayload.body }); + dbPersistCondition = ({ + serializedPayload, + }: { + +[string]: EncryptResult, + }) => payloadSizeValidator({ encryptedPayload: serializedPayload.body }); } const { encryptedMessages: { serializedPayload }, dbPersistConditionViolated, encryptionOrder, } = await encryptAndUpdateOlmSession( cookieID, 'notifications', { serializedPayload: unencryptedSerializedPayload, }, dbPersistCondition, ); return { resultPayload: { encryptedPayload: serializedPayload.body }, payloadSizeExceeded: !!dbPersistConditionViolated, encryptionOrder, }; } catch (e) { console.log('Notification encryption failed: ' + e); const resultPayload = { encryptionFailed: '1', ...unencryptedPayload, }; return { resultPayload, payloadSizeExceeded: payloadSizeValidator ? payloadSizeValidator(resultPayload) : false, }; } } async function encryptAndroidNotification( cookieID: string, notification: AndroidNotification, notificationSizeValidator?: AndroidNotification => boolean, ): Promise<{ +notification: AndroidNotification, +payloadSizeExceeded: boolean, +encryptionOrder?: number, }> { const { id, badgeOnly, ...unencryptedPayload } = notification.data; let payloadSizeValidator; if (notificationSizeValidator) { payloadSizeValidator = ( payload: AndroidNotificationPayload | { +encryptedPayload: string }, ) => { return notificationSizeValidator({ data: { id, badgeOnly, ...payload } }); }; } const { resultPayload, payloadSizeExceeded, encryptionOrder } = await encryptAndroidNotificationPayload( cookieID, unencryptedPayload, payloadSizeValidator, ); return { notification: { data: { id, badgeOnly, ...resultPayload, }, }, payloadSizeExceeded, encryptionOrder, }; } async function encryptAndroidNotificationRescind( cookieID: string, notification: AndroidNotificationRescind, ): Promise { // We don't validate payload size for rescind // since they are expected to be small and // never exceed any FCM limit const { resultPayload } = await encryptAndroidNotificationPayload( cookieID, notification.data, ); return { data: resultPayload, }; } async function encryptWebNotification( cookieID: string, notification: PlainTextWebNotification, ): Promise<{ +notification: WebNotification, +encryptionOrder?: number }> { const { id, ...payloadSansId } = notification; const unencryptedSerializedPayload = JSON.stringify(payloadSansId); try { const { encryptedMessages: { serializedPayload }, encryptionOrder, } = await encryptAndUpdateOlmSession(cookieID, 'notifications', { serializedPayload: unencryptedSerializedPayload, }); return { notification: { id, encryptedPayload: serializedPayload.body }, encryptionOrder, }; } catch (e) { console.log('Notification encryption failed: ' + e); return { notification: { id, encryptionFailed: '1', ...payloadSansId, }, }; } } function prepareEncryptedIOSNotifications( devices: $ReadOnlyArray, notification: apn.Notification, codeVersion?: ?number, notificationSizeValidator?: apn.Notification => boolean, ): Promise< $ReadOnlyArray<{ +cookieID: string, +deviceToken: string, +notification: apn.Notification, +payloadSizeExceeded: boolean, +encryptedPayloadHash?: string, +encryptionOrder?: number, }>, > { const notificationPromises = devices.map( async ({ cookieID, deviceToken }) => { const notif = await encryptIOSNotification( cookieID, notification, codeVersion, notificationSizeValidator, ); return { cookieID, deviceToken, ...notif }; }, ); return Promise.all(notificationPromises); } function prepareEncryptedIOSNotificationRescind( devices: $ReadOnlyArray, notification: apn.Notification, codeVersion?: ?number, ): Promise< $ReadOnlyArray<{ +cookieID: string, +deviceToken: string, +notification: apn.Notification, }>, > { const notificationPromises = devices.map( async ({ deviceToken, cookieID }) => { const { notification: notif } = await encryptIOSNotification( cookieID, notification, codeVersion, ); return { deviceToken, cookieID, notification: notif }; }, ); return Promise.all(notificationPromises); } function prepareEncryptedAndroidNotifications( devices: $ReadOnlyArray, notification: AndroidNotification, notificationSizeValidator?: (notification: AndroidNotification) => boolean, ): Promise< $ReadOnlyArray<{ +cookieID: string, +deviceToken: string, +notification: AndroidNotification, +payloadSizeExceeded: boolean, +encryptionOrder?: number, }>, > { const notificationPromises = devices.map( async ({ deviceToken, cookieID }) => { const notif = await encryptAndroidNotification( cookieID, notification, notificationSizeValidator, ); return { deviceToken, cookieID, ...notif }; }, ); return Promise.all(notificationPromises); } function prepareEncryptedAndroidNotificationRescinds( devices: $ReadOnlyArray, notification: AndroidNotificationRescind, ): Promise< $ReadOnlyArray<{ +cookieID: string, +deviceToken: string, +notification: AndroidNotificationRescind, +encryptionOrder?: number, }>, > { const notificationPromises = devices.map( async ({ deviceToken, cookieID }) => { const notif = await encryptAndroidNotificationRescind( cookieID, notification, ); return { deviceToken, cookieID, notification: notif }; }, ); return Promise.all(notificationPromises); } function prepareEncryptedWebNotifications( devices: $ReadOnlyArray, notification: PlainTextWebNotification, ): Promise< $ReadOnlyArray<{ +deviceToken: string, +notification: WebNotification, +encryptionOrder?: number, }>, > { const notificationPromises = devices.map( async ({ deviceToken, cookieID }) => { const notif = await encryptWebNotification(cookieID, notification); return { ...notif, deviceToken }; }, ); return Promise.all(notificationPromises); } async function encryptBlobPayload(payload: string): Promise<{ +encryptionKey: string, +encryptedPayload: Blob, +encryptedPayloadHash: string, }> { const encryptionKey = await generateKey(); const encryptedPayload = await encrypt( encryptionKey, new TextEncoder().encode(payload), ); const encryptedPayloadBuffer = Buffer.from(encryptedPayload); const blobHashBase64 = await crypto .createHash('sha256') .update(encryptedPayloadBuffer) .digest('base64'); const blobHash = toBase64URL(blobHashBase64); const payloadBlob = new Blob([encryptedPayloadBuffer]); const encryptionKeyString = Buffer.from(encryptionKey).toString('base64'); return { encryptionKey: encryptionKeyString, encryptedPayload: payloadBlob, encryptedPayloadHash: blobHash, }; } export { prepareEncryptedIOSNotifications, prepareEncryptedIOSNotificationRescind, prepareEncryptedAndroidNotifications, prepareEncryptedAndroidNotificationRescinds, prepareEncryptedWebNotifications, encryptBlobPayload, }; diff --git a/keyserver/src/push/send.js b/keyserver/src/push/send.js index 085aff171..af6b2aa5b 100644 --- a/keyserver/src/push/send.js +++ b/keyserver/src/push/send.js @@ -1,1736 +1,1737 @@ // @flow import apn from '@parse/node-apn'; import type { ResponseFailure } from '@parse/node-apn'; import invariant from 'invariant'; import _cloneDeep from 'lodash/fp/cloneDeep.js'; import _flow from 'lodash/fp/flow.js'; import _groupBy from 'lodash/fp/groupBy.js'; import _mapValues from 'lodash/fp/mapValues.js'; import _pickBy from 'lodash/fp/pickBy.js'; import type { QueryResults } from 'mysql'; import t from 'tcomb'; import uuidv4 from 'uuid/v4.js'; import { oldValidUsernameRegex } from 'lib/shared/account-utils.js'; import { isUserMentioned } from 'lib/shared/mention-utils.js'; import { createMessageInfo, sortMessageInfoList, shimUnsupportedRawMessageInfos, } from 'lib/shared/message-utils.js'; import { messageSpecs } from 'lib/shared/messages/message-specs.js'; import { notifTextsForMessageInfo } from 'lib/shared/notif-utils.js'; import { isStaff } from 'lib/shared/staff-utils.js'; import { rawThreadInfoFromServerThreadInfo, threadInfoFromRawThreadInfo, } from 'lib/shared/thread-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { Platform, PlatformDetails } from 'lib/types/device-types.js'; import { messageTypes } from 'lib/types/message-types-enum.js'; import { type RawMessageInfo, type MessageData, } from 'lib/types/message-types.js'; import { rawMessageInfoValidator } from 'lib/types/message-types.js'; import type { WNSNotification, ResolvedNotifTexts, } from 'lib/types/notif-types.js'; import { resolvedNotifTextsValidator } from 'lib/types/notif-types.js'; import type { ServerThreadInfo, ThreadInfo } from 'lib/types/thread-types.js'; import { updateTypes } from 'lib/types/update-types-enum.js'; import { type GlobalUserInfo } from 'lib/types/user-types.js'; import { isDev } from 'lib/utils/dev-utils.js'; import { values } from 'lib/utils/objects.js'; import { tID, tPlatformDetails, tShape } from 'lib/utils/validation-utils.js'; import { prepareEncryptedIOSNotifications, prepareEncryptedAndroidNotifications, prepareEncryptedWebNotifications, } from './crypto.js'; import { getAPNsNotificationTopic } from './providers.js'; import { rescindPushNotifs } from './rescind.js'; import type { NotificationTargetDevice, TargetedAPNsNotification, TargetedAndroidNotification, TargetedWebNotification, TargetedWNSNotification, + AndroidNotification, } from './types.js'; import { apnPush, fcmPush, getUnreadCounts, apnMaxNotificationPayloadByteSize, fcmMaxNotificationPayloadByteSize, wnsMaxNotificationPayloadByteSize, webPush, wnsPush, type WebPushError, type WNSPushError, } from './utils.js'; import createIDs from '../creators/id-creator.js'; import { createUpdates } from '../creators/update-creator.js'; import { dbQuery, SQL, mergeOrConditions } from '../database/database.js'; import type { CollapsableNotifInfo } from '../fetchers/message-fetchers.js'; import { fetchCollapsableNotifs } from '../fetchers/message-fetchers.js'; import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchUserInfos } from '../fetchers/user-fetchers.js'; import type { Viewer } from '../session/viewer.js'; import { getENSNames } from '../utils/ens-cache.js'; import { validateOutput } from '../utils/validation-utils.js'; export type Device = { +platform: Platform, +deviceToken: string, +cookieID: string, +codeVersion: ?number, +stateVersion: ?number, }; export type PushUserInfo = { +devices: Device[], // messageInfos and messageDatas have the same key +messageInfos: RawMessageInfo[], +messageDatas: MessageData[], }; type Delivery = PushDelivery | { collapsedInto: string }; type NotificationRow = { +dbID: string, +userID: string, +threadID?: ?string, +messageID?: ?string, +collapseKey?: ?string, +deliveries: Delivery[], }; export type PushInfo = { [userID: string]: PushUserInfo }; async function sendPushNotifs(pushInfo: PushInfo) { if (Object.keys(pushInfo).length === 0) { return; } const [ unreadCounts, { usersToCollapsableNotifInfo, serverThreadInfos, userInfos }, dbIDs, ] = await Promise.all([ getUnreadCounts(Object.keys(pushInfo)), fetchInfos(pushInfo), createDBIDs(pushInfo), ]); const preparePromises: Array>> = []; const notifications: Map = new Map(); for (const userID in usersToCollapsableNotifInfo) { const threadInfos = _flow( _mapValues((serverThreadInfo: ServerThreadInfo) => { const rawThreadInfo = rawThreadInfoFromServerThreadInfo( serverThreadInfo, userID, ); if (!rawThreadInfo) { return null; } return threadInfoFromRawThreadInfo(rawThreadInfo, userID, userInfos); }), _pickBy(threadInfo => threadInfo), )(serverThreadInfos); for (const notifInfo of usersToCollapsableNotifInfo[userID]) { preparePromises.push( preparePushNotif({ notifInfo, userID, pushUserInfo: pushInfo[userID], unreadCount: unreadCounts[userID], threadInfos, userInfos, dbIDs, rowsToSave: notifications, }), ); } } const prepareResults = await Promise.all(preparePromises); const flattenedPrepareResults = prepareResults.filter(Boolean).flat(); const deliveryResults = await deliverPushNotifsInEncryptionOrder( flattenedPrepareResults, ); const cleanUpPromise = (async () => { if (dbIDs.length === 0) { return; } const query = SQL`DELETE FROM ids WHERE id IN (${dbIDs})`; await dbQuery(query); })(); await Promise.all([ cleanUpPromise, saveNotifResults(deliveryResults, notifications, true), ]); } type PreparePushResult = { +platform: Platform, +notificationInfo: NotificationInfo, +notification: | TargetedAPNsNotification | TargetedAndroidNotification | TargetedWebNotification | TargetedWNSNotification, }; async function preparePushNotif(input: { notifInfo: CollapsableNotifInfo, userID: string, pushUserInfo: PushUserInfo, unreadCount: number, threadInfos: { +[threadID: string]: ThreadInfo }, userInfos: { +[userID: string]: GlobalUserInfo }, dbIDs: string[], // mutable rowsToSave: Map, // mutable }): Promise> { const { notifInfo, userID, pushUserInfo, unreadCount, threadInfos, userInfos, dbIDs, rowsToSave, } = input; const hydrateMessageInfo = (rawMessageInfo: RawMessageInfo) => createMessageInfo(rawMessageInfo, userID, userInfos, threadInfos); const newMessageInfos = []; const newRawMessageInfos = []; for (const newRawMessageInfo of notifInfo.newMessageInfos) { const newMessageInfo = hydrateMessageInfo(newRawMessageInfo); if (newMessageInfo) { newMessageInfos.push(newMessageInfo); newRawMessageInfos.push(newRawMessageInfo); } } if (newMessageInfos.length === 0) { return null; } const existingMessageInfos = notifInfo.existingMessageInfos .map(hydrateMessageInfo) .filter(Boolean); const allMessageInfos = sortMessageInfoList([ ...newMessageInfos, ...existingMessageInfos, ]); const [firstNewMessageInfo, ...remainingNewMessageInfos] = newMessageInfos; const { threadID } = firstNewMessageInfo; const threadInfo = threadInfos[threadID]; const parentThreadInfo = threadInfo.parentThreadID ? threadInfos[threadInfo.parentThreadID] : null; const updateBadge = threadInfo.currentUser.subscription.home; const displayBanner = threadInfo.currentUser.subscription.pushNotifs; const username = userInfos[userID] && userInfos[userID].username; const userWasMentioned = username && threadInfo.currentUser.role && oldValidUsernameRegex.test(username) && newMessageInfos.some(newMessageInfo => { const unwrappedMessageInfo = newMessageInfo.type === messageTypes.SIDEBAR_SOURCE ? newMessageInfo.sourceMessage : newMessageInfo; return ( unwrappedMessageInfo.type === messageTypes.TEXT && isUserMentioned(username, unwrappedMessageInfo.text) ); }); if (!updateBadge && !displayBanner && !userWasMentioned) { return null; } const badgeOnly = !displayBanner && !userWasMentioned; const notifTargetUserInfo = { id: userID, username }; const notifTexts = await notifTextsForMessageInfo( allMessageInfos, threadInfo, parentThreadInfo, notifTargetUserInfo, getENSNames, ); if (!notifTexts) { return null; } const dbID = dbIDs.shift(); invariant(dbID, 'should have sufficient DB IDs'); const byPlatform = getDevicesByPlatform(pushUserInfo.devices); const firstMessageID = firstNewMessageInfo.id; invariant(firstMessageID, 'RawMessageInfo.id should be set on server'); const notificationInfo = { source: 'new_message', dbID, userID, threadID, messageID: firstMessageID, collapseKey: notifInfo.collapseKey, }; const preparePromises: Array>> = []; const iosVersionsToTokens = byPlatform.get('ios'); if (iosVersionsToTokens) { for (const [versionKey, devices] of iosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails: PlatformDetails = { platform: 'ios', codeVersion, stateVersion, }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareAPNsNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount, platformDetails, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'ios', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const androidVersionsToTokens = byPlatform.get('android'); if (androidVersionsToTokens) { for (const [versionKey, devices] of androidVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'android', codeVersion, stateVersion, }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareAndroidNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount, platformDetails, dbID, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'android', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const webVersionsToTokens = byPlatform.get('web'); if (webVersionsToTokens) { for (const [versionKey, devices] of webVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'web', codeVersion, stateVersion, }; const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareWebNotification( userID, { notifTexts, threadID: threadInfo.id, unreadCount, platformDetails, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'web', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const macosVersionsToTokens = byPlatform.get('macos'); if (macosVersionsToTokens) { for (const [versionKey, devices] of macosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'macos', codeVersion, stateVersion, }; const shimmedNewRawMessageInfos = shimUnsupportedRawMessageInfos( newRawMessageInfos, platformDetails, ); const preparePromise: Promise<$ReadOnlyArray> = (async () => { const targetedNotifications = await prepareAPNsNotification( { notifTexts, newRawMessageInfos: shimmedNewRawMessageInfos, threadID: threadInfo.id, collapseKey: notifInfo.collapseKey, badgeOnly, unreadCount, platformDetails, }, devices, ); return targetedNotifications.map(notification => ({ notification, platform: 'macos', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const windowsVersionsToTokens = byPlatform.get('windows'); if (windowsVersionsToTokens) { for (const [versionKey, devices] of windowsVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const platformDetails = { platform: 'windows', codeVersion, stateVersion, }; const preparePromise: Promise<$ReadOnlyArray> = (async () => { const notification = await prepareWNSNotification({ notifTexts, threadID: threadInfo.id, unreadCount, platformDetails, }); return devices.map(({ deviceToken }) => ({ notification: ({ deviceToken, notification, }: TargetedWNSNotification), platform: 'windows', notificationInfo: { ...notificationInfo, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } for (const newMessageInfo of remainingNewMessageInfos) { const newDBID = dbIDs.shift(); invariant(newDBID, 'should have sufficient DB IDs'); const messageID = newMessageInfo.id; invariant(messageID, 'RawMessageInfo.id should be set on server'); rowsToSave.set(newDBID, { dbID: newDBID, userID, threadID: newMessageInfo.threadID, messageID, collapseKey: notifInfo.collapseKey, deliveries: [{ collapsedInto: dbID }], }); } const prepareResults = await Promise.all(preparePromises); return prepareResults.flat(); } // For better readability we don't differentiate between // encrypted and unencrypted notifs and order them together function compareEncryptionOrder( pushNotif1: PreparePushResult, pushNotif2: PreparePushResult, ): number { const order1 = pushNotif1.notification.encryptionOrder ?? 0; const order2 = pushNotif2.notification.encryptionOrder ?? 0; return order1 - order2; } async function deliverPushNotifsInEncryptionOrder( preparedPushNotifs: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const deliveryPromises: Array>> = []; const groupedByDevice = _groupBy( preparedPushNotif => preparedPushNotif.deviceToken, )(preparedPushNotifs); for (const preparedPushNotifsForDevice of values(groupedByDevice)) { const orderedPushNotifsForDevice = preparedPushNotifsForDevice.sort( compareEncryptionOrder, ); const deviceDeliveryPromise = (async () => { const deliveries = []; for (const preparedPushNotif of orderedPushNotifsForDevice) { const { platform, notification, notificationInfo } = preparedPushNotif; let delivery: PushResult; if (platform === 'ios' || platform === 'macos') { delivery = await sendAPNsNotification( platform, [notification], notificationInfo, ); } else if (platform === 'android') { delivery = await sendAndroidNotification( [notification], notificationInfo, ); } else if (platform === 'web') { delivery = await sendWebNotifications( [notification], notificationInfo, ); } else if (platform === 'windows') { delivery = await sendWNSNotification( [notification], notificationInfo, ); } if (delivery) { deliveries.push(delivery); } } return deliveries; })(); deliveryPromises.push(deviceDeliveryPromise); } const deliveryResults = await Promise.all(deliveryPromises); return deliveryResults.flat(); } async function sendRescindNotifs(rescindInfo: PushInfo) { if (Object.keys(rescindInfo).length === 0) { return; } const usersToCollapsableNotifInfo = await fetchCollapsableNotifs(rescindInfo); const promises = []; for (const userID in usersToCollapsableNotifInfo) { for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const existingMessageInfo of notifInfo.existingMessageInfos) { const rescindCondition = SQL` n.user = ${userID} AND n.thread = ${existingMessageInfo.threadID} AND n.message = ${existingMessageInfo.id} `; promises.push(rescindPushNotifs(rescindCondition)); } } } await Promise.all(promises); } // The results in deliveryResults will be combined with the rows // in rowsToSave and then written to the notifications table async function saveNotifResults( deliveryResults: $ReadOnlyArray, inputRowsToSave: Map, rescindable: boolean, ) { const rowsToSave = new Map(inputRowsToSave); const allInvalidTokens = []; for (const deliveryResult of deliveryResults) { const { info, delivery, invalidTokens } = deliveryResult; const { dbID, userID } = info; const curNotifRow = rowsToSave.get(dbID); if (curNotifRow) { curNotifRow.deliveries.push(delivery); } else { // Ternary expressions for Flow const threadID = info.threadID ? info.threadID : null; const messageID = info.messageID ? info.messageID : null; const collapseKey = info.collapseKey ? info.collapseKey : null; rowsToSave.set(dbID, { dbID, userID, threadID, messageID, collapseKey, deliveries: [delivery], }); } if (invalidTokens) { allInvalidTokens.push({ userID, tokens: invalidTokens, }); } } const notificationRows = []; for (const notification of rowsToSave.values()) { notificationRows.push([ notification.dbID, notification.userID, notification.threadID, notification.messageID, notification.collapseKey, JSON.stringify(notification.deliveries), Number(!rescindable), ]); } const dbPromises: Array> = []; if (allInvalidTokens.length > 0) { dbPromises.push(removeInvalidTokens(allInvalidTokens)); } if (notificationRows.length > 0) { const query = SQL` INSERT INTO notifications (id, user, thread, message, collapse_key, delivery, rescinded) VALUES ${notificationRows} `; dbPromises.push(dbQuery(query)); } if (dbPromises.length > 0) { await Promise.all(dbPromises); } } async function fetchInfos(pushInfo: PushInfo) { const usersToCollapsableNotifInfo = await fetchCollapsableNotifs(pushInfo); const threadIDs = new Set(); const threadWithChangedNamesToMessages = new Map>(); const addThreadIDsFromMessageInfos = (rawMessageInfo: RawMessageInfo) => { const threadID = rawMessageInfo.threadID; threadIDs.add(threadID); const messageSpec = messageSpecs[rawMessageInfo.type]; if (messageSpec.threadIDs) { for (const id of messageSpec.threadIDs(rawMessageInfo)) { threadIDs.add(id); } } if ( rawMessageInfo.type === messageTypes.CHANGE_SETTINGS && rawMessageInfo.field === 'name' ) { const messages = threadWithChangedNamesToMessages.get(threadID); if (messages) { messages.push(rawMessageInfo.id); } else { threadWithChangedNamesToMessages.set(threadID, [rawMessageInfo.id]); } } }; for (const userID in usersToCollapsableNotifInfo) { for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const rawMessageInfo of notifInfo.existingMessageInfos) { addThreadIDsFromMessageInfos(rawMessageInfo); } for (const rawMessageInfo of notifInfo.newMessageInfos) { addThreadIDsFromMessageInfos(rawMessageInfo); } } } // These threadInfos won't have currentUser set const threadPromise = fetchServerThreadInfos({ threadIDs }); const oldNamesPromise: Promise = (async () => { if (threadWithChangedNamesToMessages.size === 0) { return undefined; } const typesThatAffectName = [ messageTypes.CHANGE_SETTINGS, messageTypes.CREATE_THREAD, ]; const oldNameQuery = SQL` SELECT IF( JSON_TYPE(JSON_EXTRACT(m.content, "$.name")) = 'NULL', "", JSON_UNQUOTE(JSON_EXTRACT(m.content, "$.name")) ) AS name, m.thread FROM ( SELECT MAX(id) AS id FROM messages WHERE type IN (${typesThatAffectName}) AND JSON_EXTRACT(content, "$.name") IS NOT NULL AND`; const threadClauses = []; for (const [threadID, messages] of threadWithChangedNamesToMessages) { threadClauses.push( SQL`(thread = ${threadID} AND id NOT IN (${messages}))`, ); } oldNameQuery.append(mergeOrConditions(threadClauses)); oldNameQuery.append(SQL` GROUP BY thread ) x LEFT JOIN messages m ON m.id = x.id `); return await dbQuery(oldNameQuery); })(); const [threadResult, oldNames] = await Promise.all([ threadPromise, oldNamesPromise, ]); const serverThreadInfos = threadResult.threadInfos; if (oldNames) { const [result] = oldNames; for (const row of result) { const threadID = row.thread.toString(); serverThreadInfos[threadID].name = row.name; } } const userInfos = await fetchNotifUserInfos( serverThreadInfos, usersToCollapsableNotifInfo, ); return { usersToCollapsableNotifInfo, serverThreadInfos, userInfos }; } async function fetchNotifUserInfos( serverThreadInfos: { +[threadID: string]: ServerThreadInfo }, usersToCollapsableNotifInfo: { +[userID: string]: CollapsableNotifInfo[] }, ) { const missingUserIDs = new Set(); for (const threadID in serverThreadInfos) { const serverThreadInfo = serverThreadInfos[threadID]; for (const member of serverThreadInfo.members) { missingUserIDs.add(member.id); } } const addUserIDsFromMessageInfos = (rawMessageInfo: RawMessageInfo) => { missingUserIDs.add(rawMessageInfo.creatorID); const userIDs = messageSpecs[rawMessageInfo.type].userIDs?.(rawMessageInfo) ?? []; for (const userID of userIDs) { missingUserIDs.add(userID); } }; for (const userID in usersToCollapsableNotifInfo) { missingUserIDs.add(userID); for (const notifInfo of usersToCollapsableNotifInfo[userID]) { for (const rawMessageInfo of notifInfo.existingMessageInfos) { addUserIDsFromMessageInfos(rawMessageInfo); } for (const rawMessageInfo of notifInfo.newMessageInfos) { addUserIDsFromMessageInfos(rawMessageInfo); } } } return await fetchUserInfos([...missingUserIDs]); } async function createDBIDs(pushInfo: PushInfo): Promise { let numIDsNeeded = 0; for (const userID in pushInfo) { numIDsNeeded += pushInfo[userID].messageInfos.length; } return await createIDs('notifications', numIDsNeeded); } type VersionKey = { codeVersion: number, stateVersion: number }; const versionKeyRegex: RegExp = new RegExp(/^-?\d+\|-?\d+$/); function versionKeyToString(versionKey: VersionKey): string { return `${versionKey.codeVersion}|${versionKey.stateVersion}`; } function stringToVersionKey(versionKeyString: string): VersionKey { invariant( versionKeyRegex.test(versionKeyString), 'should pass correct version key string', ); const [codeVersion, stateVersion] = versionKeyString.split('|').map(Number); return { codeVersion, stateVersion }; } function getDevicesByPlatform( devices: $ReadOnlyArray, ): Map>> { const byPlatform = new Map< Platform, Map>, >(); for (const device of devices) { let innerMap = byPlatform.get(device.platform); if (!innerMap) { innerMap = new Map>(); byPlatform.set(device.platform, innerMap); } const codeVersion: number = device.codeVersion !== null && device.codeVersion !== undefined && device.platform !== 'windows' && device.platform !== 'macos' ? device.codeVersion : -1; const stateVersion: number = device.stateVersion ?? -1; const versionKey = versionKeyToString({ codeVersion, stateVersion, }); let innerMostArrayTmp: ?Array = innerMap.get(versionKey); if (!innerMostArrayTmp) { innerMostArrayTmp = []; innerMap.set(versionKey, innerMostArrayTmp); } const innerMostArray = innerMostArrayTmp; innerMostArray.push({ cookieID: device.cookieID, deviceToken: device.deviceToken, }); } return byPlatform; } type APNsNotifInputData = { +notifTexts: ResolvedNotifTexts, +newRawMessageInfos: RawMessageInfo[], +threadID: string, +collapseKey: ?string, +badgeOnly: boolean, +unreadCount: number, +platformDetails: PlatformDetails, }; const apnsNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, newRawMessageInfos: t.list(rawMessageInfoValidator), threadID: tID, collapseKey: t.maybe(t.String), badgeOnly: t.Boolean, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareAPNsNotification( inputData: APNsNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, apnsNotifInputDataValidator, inputData, ); const { notifTexts, newRawMessageInfos, threadID, collapseKey, badgeOnly, unreadCount, platformDetails, } = convertedData; const canDecryptNonCollapsibleTextNotifs = platformDetails.codeVersion && platformDetails.codeVersion > 222; const isNonCollapsibleTextNotification = newRawMessageInfos.every( newRawMessageInfo => newRawMessageInfo.type === messageTypes.TEXT, ) && !collapseKey; const canDecryptAllNotifTypes = platformDetails.codeVersion && platformDetails.codeVersion >= 267; const shouldBeEncrypted = platformDetails.platform === 'ios' && (canDecryptAllNotifTypes || (isNonCollapsibleTextNotification && canDecryptNonCollapsibleTextNotifs)); const uniqueID = uuidv4(); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic(platformDetails); const { merged, ...rest } = notifTexts; // We don't include alert's body on macos because we // handle displaying the notification ourselves and // we don't want macOS to display it automatically. if (!badgeOnly && platformDetails.platform !== 'macos') { notification.body = merged; notification.sound = 'default'; } notification.payload = { ...notification.payload, ...rest, }; notification.badge = unreadCount; notification.threadId = threadID; notification.id = uniqueID; notification.pushType = 'alert'; notification.payload.id = uniqueID; notification.payload.threadID = threadID; if (platformDetails.codeVersion && platformDetails.codeVersion > 198) { notification.mutableContent = true; } if (collapseKey && canDecryptAllNotifTypes) { notification.payload.collapseID = collapseKey; } else if (collapseKey) { notification.collapseId = collapseKey; } const messageInfos = JSON.stringify(newRawMessageInfos); // We make a copy before checking notification's length, because calling // length compiles the notification and makes it immutable. Further // changes to its properties won't be reflected in the final plaintext // data that is sent. const copyWithMessageInfos = _cloneDeep(notification); copyWithMessageInfos.payload = { ...copyWithMessageInfos.payload, messageInfos, }; - const notificationSizeValidator = notif => + const notificationSizeValidator = (notif: apn.Notification) => notif.length() <= apnMaxNotificationPayloadByteSize; if (!shouldBeEncrypted) { const notificationToSend = notificationSizeValidator( _cloneDeep(copyWithMessageInfos), ) ? copyWithMessageInfos : notification; return devices.map(({ deviceToken }) => ({ notification: notificationToSend, deviceToken, })); } const notifsWithMessageInfos = await prepareEncryptedIOSNotifications( devices, copyWithMessageInfos, platformDetails.codeVersion, notificationSizeValidator, ); const devicesWithExcessiveSize = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => payloadSizeExceeded) .map(({ deviceToken, cookieID }) => ({ deviceToken, cookieID })); if (devicesWithExcessiveSize.length === 0) { return notifsWithMessageInfos.map( ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }) => ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }), ); } const notifsWithoutMessageInfos = await prepareEncryptedIOSNotifications( devicesWithExcessiveSize, notification, platformDetails.codeVersion, ); const targetedNotifsWithMessageInfos = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => !payloadSizeExceeded) .map( ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }) => ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }), ); const targetedNotifsWithoutMessageInfos = notifsWithoutMessageInfos.map( ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }) => ({ notification: notif, deviceToken, encryptedPayloadHash, encryptionOrder, }), ); return [ ...targetedNotifsWithMessageInfos, ...targetedNotifsWithoutMessageInfos, ]; } type AndroidNotifInputData = { ...APNsNotifInputData, +dbID: string, }; const androidNotifInputDataValidator = tShape({ ...apnsNotifInputDataValidator.meta.props, dbID: t.String, }); async function prepareAndroidNotification( inputData: AndroidNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, androidNotifInputDataValidator, inputData, ); const { notifTexts, newRawMessageInfos, threadID, collapseKey, badgeOnly, unreadCount, platformDetails: { codeVersion }, dbID, } = convertedData; const canDecryptNonCollapsibleTextNotifs = codeVersion && codeVersion > 228; const isNonCollapsibleTextNotif = newRawMessageInfos.every( newRawMessageInfo => newRawMessageInfo.type === messageTypes.TEXT, ) && !collapseKey; const canDecryptAllNotifTypes = codeVersion && codeVersion >= 267; const shouldBeEncrypted = canDecryptAllNotifTypes || (canDecryptNonCollapsibleTextNotifs && isNonCollapsibleTextNotif); const { merged, ...rest } = notifTexts; const notification = { data: { badge: unreadCount.toString(), ...rest, threadID, }, }; let notifID; if (collapseKey && canDecryptAllNotifTypes) { notifID = dbID; notification.data = { ...notification.data, collapseKey, }; } else if (collapseKey) { notifID = collapseKey; } else { notifID = dbID; } // The reason we only include `badgeOnly` for newer clients is because older // clients don't know how to parse it. The reason we only include `id` for // newer clients is that if the older clients see that field, they assume // the notif has a full payload, and then crash when trying to parse it. // By skipping `id` we allow old clients to still handle in-app notifs and // badge updating. if (!badgeOnly || (codeVersion && codeVersion >= 69)) { notification.data = { ...notification.data, id: notifID, badgeOnly: badgeOnly ? '1' : '0', }; } const messageInfos = JSON.stringify(newRawMessageInfos); const copyWithMessageInfos = { ...notification, data: { ...notification.data, messageInfos }, }; if (!shouldBeEncrypted) { const notificationToSend = Buffer.byteLength(JSON.stringify(copyWithMessageInfos)) <= fcmMaxNotificationPayloadByteSize ? copyWithMessageInfos : notification; return devices.map(({ deviceToken }) => ({ notification: notificationToSend, deviceToken, })); } - const notificationsSizeValidator = notif => { + const notificationsSizeValidator = (notif: AndroidNotification) => { const serializedNotif = JSON.stringify(notif); return ( !serializedNotif || Buffer.byteLength(serializedNotif) <= fcmMaxNotificationPayloadByteSize ); }; const notifsWithMessageInfos = await prepareEncryptedAndroidNotifications( devices, copyWithMessageInfos, notificationsSizeValidator, ); const devicesWithExcessiveSize = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => payloadSizeExceeded) .map(({ cookieID, deviceToken }) => ({ cookieID, deviceToken })); if (devicesWithExcessiveSize.length === 0) { return notifsWithMessageInfos.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); } const notifsWithoutMessageInfos = await prepareEncryptedAndroidNotifications( devicesWithExcessiveSize, notification, ); const targetedNotifsWithMessageInfos = notifsWithMessageInfos .filter(({ payloadSizeExceeded }) => !payloadSizeExceeded) .map(({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, })); const targetedNotifsWithoutMessageInfos = notifsWithoutMessageInfos.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); return [ ...targetedNotifsWithMessageInfos, ...targetedNotifsWithoutMessageInfos, ]; } type WebNotifInputData = { +notifTexts: ResolvedNotifTexts, +threadID: string, +unreadCount: number, +platformDetails: PlatformDetails, }; const webNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, threadID: tID, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareWebNotification( userID: string, inputData: WebNotifInputData, devices: $ReadOnlyArray, ): Promise<$ReadOnlyArray> { const convertedData = validateOutput( inputData.platformDetails, webNotifInputDataValidator, inputData, ); const { notifTexts, threadID, unreadCount } = convertedData; const id = uuidv4(); const { merged, ...rest } = notifTexts; const notification = { ...rest, unreadCount, id, threadID, }; const isStaffOrDev = isStaff(userID) || isDev; const shouldBeEncrypted = hasMinCodeVersion(convertedData.platformDetails, { web: 43, }) && isStaffOrDev; if (!shouldBeEncrypted) { return devices.map(({ deviceToken }) => ({ deviceToken, notification })); } return prepareEncryptedWebNotifications(devices, notification); } type WNSNotifInputData = { +notifTexts: ResolvedNotifTexts, +threadID: string, +unreadCount: number, +platformDetails: PlatformDetails, }; const wnsNotifInputDataValidator = tShape({ notifTexts: resolvedNotifTextsValidator, threadID: tID, unreadCount: t.Number, platformDetails: tPlatformDetails, }); async function prepareWNSNotification( inputData: WNSNotifInputData, ): Promise { const convertedData = validateOutput( inputData.platformDetails, wnsNotifInputDataValidator, inputData, ); const { notifTexts, threadID, unreadCount } = convertedData; const { merged, ...rest } = notifTexts; const notification = { ...rest, unreadCount, threadID, }; if ( Buffer.byteLength(JSON.stringify(notification)) > wnsMaxNotificationPayloadByteSize ) { console.warn('WNS notification exceeds size limit'); } return notification; } type NotificationInfo = | { +source: 'new_message', +dbID: string, +userID: string, +threadID: string, +messageID: string, +collapseKey: ?string, +codeVersion: number, +stateVersion: number, } | { +source: 'mark_as_unread' | 'mark_as_read' | 'activity_update', +dbID: string, +userID: string, +codeVersion: number, +stateVersion: number, }; type APNsDelivery = { +source: $PropertyType, +deviceType: 'ios' | 'macos', +iosID: string, +deviceTokens: $ReadOnlyArray, +codeVersion: number, +stateVersion: number, +errors?: $ReadOnlyArray, +encryptedPayloadHashes?: $ReadOnlyArray, +deviceTokensToPayloadHash?: { +[deviceToken: string]: string, }, }; type APNsResult = { info: NotificationInfo, delivery: APNsDelivery, invalidTokens?: $ReadOnlyArray, }; async function sendAPNsNotification( platform: 'ios' | 'macos', targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion, stateVersion } = notificationInfo; const response = await apnPush({ targetedNotifications, platformDetails: { platform, codeVersion }, }); invariant( new Set(targetedNotifications.map(({ notification }) => notification.id)) .size === 1, 'Encrypted versions of the same notification must share id value', ); const iosID = targetedNotifications[0].notification.id; const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); let delivery: APNsDelivery = { source, deviceType: platform, iosID, deviceTokens, codeVersion, stateVersion, }; if (response.errors) { delivery = { ...delivery, errors: response.errors, }; } const deviceTokensToPayloadHash: { [string]: string } = {}; for (const targetedNotification of targetedNotifications) { if (targetedNotification.encryptedPayloadHash) { deviceTokensToPayloadHash[targetedNotification.deviceToken] = targetedNotification.encryptedPayloadHash; } } if (Object.keys(deviceTokensToPayloadHash).length !== 0) { delivery = { ...delivery, deviceTokensToPayloadHash, }; } const result: APNsResult = { info: notificationInfo, delivery, }; if (response.invalidTokens) { result.invalidTokens = response.invalidTokens; } return result; } type PushResult = AndroidResult | APNsResult | WebResult | WNSResult; type PushDelivery = AndroidDelivery | APNsDelivery | WebDelivery | WNSDelivery; type AndroidDelivery = { source: $PropertyType, deviceType: 'android', androidIDs: $ReadOnlyArray, deviceTokens: $ReadOnlyArray, codeVersion: number, stateVersion: number, errors?: $ReadOnlyArray, }; type AndroidResult = { info: NotificationInfo, delivery: AndroidDelivery, invalidTokens?: $ReadOnlyArray, }; async function sendAndroidNotification( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const collapseKey = notificationInfo.collapseKey ? notificationInfo.collapseKey : null; // for Flow... const { source, codeVersion, stateVersion } = notificationInfo; const response = await fcmPush({ targetedNotifications, collapseKey, codeVersion, }); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const androidIDs = response.fcmIDs ? response.fcmIDs : []; const delivery: AndroidDelivery = { source, deviceType: 'android', androidIDs, deviceTokens, codeVersion, stateVersion, }; if (response.errors) { delivery.errors = response.errors; } const result: AndroidResult = { info: notificationInfo, delivery, }; if (response.invalidTokens) { result.invalidTokens = response.invalidTokens; } return result; } type WebDelivery = { +source: $PropertyType, +deviceType: 'web', +deviceTokens: $ReadOnlyArray, +codeVersion?: number, +stateVersion: number, +errors?: $ReadOnlyArray, }; type WebResult = { +info: NotificationInfo, +delivery: WebDelivery, +invalidTokens?: $ReadOnlyArray, }; async function sendWebNotifications( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion, stateVersion } = notificationInfo; const response = await webPush(targetedNotifications); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const delivery: WebDelivery = { source, deviceType: 'web', deviceTokens, codeVersion, errors: response.errors, stateVersion, }; const result: WebResult = { info: notificationInfo, delivery, invalidTokens: response.invalidTokens, }; return result; } type WNSDelivery = { +source: $PropertyType, +deviceType: 'windows', +wnsIDs: $ReadOnlyArray, +deviceTokens: $ReadOnlyArray, +codeVersion?: number, +stateVersion: number, +errors?: $ReadOnlyArray, }; type WNSResult = { +info: NotificationInfo, +delivery: WNSDelivery, +invalidTokens?: $ReadOnlyArray, }; async function sendWNSNotification( targetedNotifications: $ReadOnlyArray, notificationInfo: NotificationInfo, ): Promise { const { source, codeVersion, stateVersion } = notificationInfo; const response = await wnsPush(targetedNotifications); const deviceTokens = targetedNotifications.map( ({ deviceToken }) => deviceToken, ); const wnsIDs = response.wnsIDs ?? []; const delivery: WNSDelivery = { source, deviceType: 'windows', wnsIDs, deviceTokens, codeVersion, errors: response.errors, stateVersion, }; const result: WNSResult = { info: notificationInfo, delivery, invalidTokens: response.invalidTokens, }; return result; } type InvalidToken = { +userID: string, +tokens: $ReadOnlyArray, }; async function removeInvalidTokens( invalidTokens: $ReadOnlyArray, ): Promise { const sqlTuples = invalidTokens.map( invalidTokenUser => SQL`( user = ${invalidTokenUser.userID} AND device_token IN (${invalidTokenUser.tokens}) )`, ); const sqlCondition = mergeOrConditions(sqlTuples); const selectQuery = SQL` SELECT id, user, device_token FROM cookies WHERE `; selectQuery.append(sqlCondition); const [result] = await dbQuery(selectQuery); const userCookiePairsToInvalidDeviceTokens = new Map>(); for (const row of result) { const userCookiePair = `${row.user}|${row.id}`; const existing = userCookiePairsToInvalidDeviceTokens.get(userCookiePair); if (existing) { existing.add(row.device_token); } else { userCookiePairsToInvalidDeviceTokens.set( userCookiePair, new Set([row.device_token]), ); } } const time = Date.now(); const promises: Array> = []; for (const entry of userCookiePairsToInvalidDeviceTokens) { const [userCookiePair, deviceTokens] = entry; const [userID, cookieID] = userCookiePair.split('|'); const updateDatas = [...deviceTokens].map(deviceToken => ({ type: updateTypes.BAD_DEVICE_TOKEN, userID, time, deviceToken, targetCookie: cookieID, })); promises.push(createUpdates(updateDatas)); } const updateQuery = SQL` UPDATE cookies SET device_token = NULL WHERE `; updateQuery.append(sqlCondition); promises.push(dbQuery(updateQuery)); await Promise.all(promises); } async function updateBadgeCount( viewer: Viewer, source: 'mark_as_unread' | 'mark_as_read' | 'activity_update', ) { const { userID } = viewer; const deviceTokenQuery = SQL` SELECT platform, device_token, versions, id FROM cookies WHERE user = ${userID} AND device_token IS NOT NULL `; if (viewer.data.cookieID) { deviceTokenQuery.append(SQL`AND id != ${viewer.cookieID} `); } const [unreadCounts, [deviceTokenResult], [dbID]] = await Promise.all([ getUnreadCounts([userID]), dbQuery(deviceTokenQuery), createIDs('notifications', 1), ]); const unreadCount = unreadCounts[userID]; const devices = deviceTokenResult.map(row => { const versions = JSON.parse(row.versions); return { platform: row.platform, cookieID: row.id, deviceToken: row.device_token, codeVersion: versions?.codeVersion, stateVersion: versions?.stateVersion, }; }); const byPlatform = getDevicesByPlatform(devices); const preparePromises: Array>> = []; const iosVersionsToTokens = byPlatform.get('ios'); if (iosVersionsToTokens) { for (const [versionKey, deviceInfos] of iosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'ios', codeVersion, stateVersion, }); notification.badge = unreadCount; notification.pushType = 'alert'; const preparePromise: Promise = (async () => { let targetedNotifications: $ReadOnlyArray; if (codeVersion > 222) { const notificationsArray = await prepareEncryptedIOSNotifications( deviceInfos, notification, codeVersion, ); targetedNotifications = notificationsArray.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); } else { targetedNotifications = deviceInfos.map(({ deviceToken }) => ({ notification, deviceToken, })); } return targetedNotifications.map(targetedNotification => ({ notification: targetedNotification, platform: 'ios', notificationInfo: { source, dbID, userID, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const androidVersionsToTokens = byPlatform.get('android'); if (androidVersionsToTokens) { for (const [versionKey, deviceInfos] of androidVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const notificationData = codeVersion < 69 ? { badge: unreadCount.toString() } : { badge: unreadCount.toString(), badgeOnly: '1' }; const notification = { data: notificationData }; const preparePromise: Promise = (async () => { let targetedNotifications: $ReadOnlyArray; if (codeVersion > 222) { const notificationsArray = await prepareEncryptedAndroidNotifications( deviceInfos, notification, ); targetedNotifications = notificationsArray.map( ({ notification: notif, deviceToken, encryptionOrder }) => ({ notification: notif, deviceToken, encryptionOrder, }), ); } else { targetedNotifications = deviceInfos.map(({ deviceToken }) => ({ deviceToken, notification, })); } return targetedNotifications.map(targetedNotification => ({ notification: targetedNotification, platform: 'android', notificationInfo: { source, dbID, userID, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const macosVersionsToTokens = byPlatform.get('macos'); if (macosVersionsToTokens) { for (const [versionKey, deviceInfos] of macosVersionsToTokens) { const { codeVersion, stateVersion } = stringToVersionKey(versionKey); const notification = new apn.Notification(); notification.topic = getAPNsNotificationTopic({ platform: 'macos', codeVersion, stateVersion, }); notification.badge = unreadCount; notification.pushType = 'alert'; const preparePromise: Promise = (async () => { return deviceInfos.map(({ deviceToken }) => ({ notification: ({ deviceToken, notification, }: TargetedAPNsNotification), platform: 'macos', notificationInfo: { source, dbID, userID, codeVersion, stateVersion, }, })); })(); preparePromises.push(preparePromise); } } const prepareResults = await Promise.all(preparePromises); const flattenedPrepareResults = prepareResults.filter(Boolean).flat(); const deliveryResults = await deliverPushNotifsInEncryptionOrder( flattenedPrepareResults, ); await saveNotifResults(deliveryResults, new Map(), false); } export { sendPushNotifs, sendRescindNotifs, updateBadgeCount }; diff --git a/keyserver/src/scripts/generate-converter-from-validator.js b/keyserver/src/scripts/generate-converter-from-validator.js index da1416cfd..5f833b0b7 100644 --- a/keyserver/src/scripts/generate-converter-from-validator.js +++ b/keyserver/src/scripts/generate-converter-from-validator.js @@ -1,234 +1,234 @@ // @flow import t, { type TInterface, type TType } from 'tcomb'; import { nativeMediaSelectionValidator, mediaValidator, } from 'lib/types/media-types.js'; import { threadPermissionInfoValidator } from 'lib/types/thread-permission-types.js'; import { rawThreadInfoValidator } from 'lib/types/thread-types.js'; import { ashoatKeyserverID, tID } from 'lib/utils/validation-utils.js'; import { main } from './utils.js'; function getDiscriminatorFieldForUnionValidator(validator: TType) { if (validator === threadPermissionInfoValidator) { return 'value'; } if (validator === nativeMediaSelectionValidator) { return 'step'; } return 'type'; } function flattenInnerUnionValidators( innerValidators: $ReadOnlyArray>, ): TInterface<{ +[string]: mixed }>[] { let result: TInterface<{ +[string]: mixed }>[] = []; for (const innerValidator of innerValidators) { if (innerValidator.meta.kind === 'interface') { // In flow, union refinement only works if every variant has a key // that is a literal. In this case we don't get a refinement of // `innerValidator` because we are checking value of the inner // `meta` object. const recastValidator: TInterface<{ +[string]: mixed }> = (innerValidator: any); result.push(recastValidator); } else if (innerValidator.meta.kind === 'union') { result = [ ...result, ...flattenInnerUnionValidators(innerValidator.meta.types), ]; } else if ([t.String, t.Number, t.Boolean].includes(innerValidator)) { // We don't need to handle literal types because they can't be // converted } else { throw new Error( `Validator not supported in union: ${innerValidator.displayName}`, ); } } return result; } // MediaValidator is special cased because of flow issues function getConverterForMediaValidator(inputName: string) { return `(${inputName}.type === 'photo' ? { ...${inputName}, id: '256|' + ${inputName}.id } : ${inputName}.type === 'video' ? { ...${inputName}, id: '256|' + ${inputName}.id, thumbnailID: '256|' + ${inputName}.thumbnailID, } : ${inputName}.type === 'encrypted_photo' ? ({ ...${inputName}, id: '256|' + ${inputName}.id }: any) : ${inputName}.type === 'encrypted_video' ? ({ ...${inputName}, id: '256|' + ${inputName}.id, thumbnailID: '256|' + ${inputName}.thumbnailID, }: any) : ${inputName})`; } // `null` is returned if there is no conversion needed in T or any // of it's inner types function generateConverterFromValidator( validator: TType, inputName: string, validatorToBeConverted: TType, conversionExpressionString: (inputName: string) => string, ): ?string { if (validator === validatorToBeConverted) { return `(${conversionExpressionString(inputName)})`; } else if (validator === mediaValidator) { return getConverterForMediaValidator(inputName); } if (validator.meta.kind === 'maybe') { const inner = generateConverterFromValidator( validator.meta.type, inputName, validatorToBeConverted, conversionExpressionString, ); if (!inner) { return null; } return `((${inputName} !== null && ${inputName} !== undefined) ? (${inner}) : (${inputName}))`; } if (validator.meta.kind === 'subtype') { return generateConverterFromValidator( validator.meta.type, inputName, validatorToBeConverted, conversionExpressionString, ); } if (validator.meta.kind === 'interface') { // In flow, union refinement only works if every variant has a key // that is a literal. In this case we don't get a refinement of `validator` // because we are checking value of the inner `meta` object. const recastValidator: TInterface<{ +[string]: mixed }> = (validator: any); const fieldConverters = []; for (const key in recastValidator.meta.props) { const inner = generateConverterFromValidator( recastValidator.meta.props[key], `${inputName}.${key}`, validatorToBeConverted, conversionExpressionString, ); if (inner) { fieldConverters.push(`${key}:${inner}`); } } if (fieldConverters.length === 0) { return null; } return `({...${inputName}, ${fieldConverters.join(',')}})`; } if (validator.meta.kind === 'union') { const innerValidators = flattenInnerUnionValidators(validator.meta.types); const variantConverters = []; for (const innerValidator of innerValidators) { const discriminatorField = getDiscriminatorFieldForUnionValidator(validator); const discriminatorValidator = innerValidator.meta.props[discriminatorField]; if (!discriminatorValidator) { throw new Error( 'Union should have a discriminator ' + validator.displayName, ); } const discriminatorValue = discriminatorValidator.meta.name; const inner = generateConverterFromValidator( innerValidator, inputName, validatorToBeConverted, conversionExpressionString, ); if (inner) { variantConverters.push( `(${inputName}.${discriminatorField} === ${discriminatorValue}) ? (${inner})`, ); } } if (variantConverters.length === 0) { return null; } variantConverters.push(`(${inputName})`); return `(${variantConverters.join(':')})`; } if (validator.meta.kind === 'list') { const inner = generateConverterFromValidator( validator.meta.type, 'elem', validatorToBeConverted, conversionExpressionString, ); if (!inner) { return inputName; } return `(${inputName}.map(elem => ${inner}))`; } if (validator.meta.kind === 'dict') { const domainValidator = validator.meta.domain; const codomainValidator = validator.meta.codomain; let domainConverter = null; if (domainValidator === validatorToBeConverted) { domainConverter = conversionExpressionString('key'); } let codomainConverter = generateConverterFromValidator( codomainValidator, 'value', validatorToBeConverted, conversionExpressionString, ); if (!domainConverter && !codomainConverter) { return null; } domainConverter = domainConverter ?? 'key'; codomainConverter = codomainConverter ?? 'value'; return `(Object.fromEntries( entries(${inputName}).map( ([key, value]) => [${domainConverter}, ${codomainConverter}] ) ))`; } return null; } // Input arguments: const validator = rawThreadInfoValidator; const typeName = 'RawThreadInfo'; const validatorToBeConverted = tID; -const conversionExpressionString = inputName => +const conversionExpressionString = (inputName: string) => `'${ashoatKeyserverID}|' + ${inputName}`; main([ async () => { console.log( `export function convert${typeName}ToNewIDSchema(input: ${typeName}): ${typeName} { return`, generateConverterFromValidator( validator, 'input', validatorToBeConverted, conversionExpressionString, ) ?? 'input', ';}', ); }, ]); diff --git a/keyserver/src/socket/socket.js b/keyserver/src/socket/socket.js index 55433cda2..13a8862a4 100644 --- a/keyserver/src/socket/socket.js +++ b/keyserver/src/socket/socket.js @@ -1,886 +1,886 @@ // @flow import type { $Request } from 'express'; import invariant from 'invariant'; import _debounce from 'lodash/debounce.js'; import t from 'tcomb'; import type { TUnion } from 'tcomb'; import WebSocket from 'ws'; import { baseLegalPolicies } from 'lib/facts/policies.js'; import { mostRecentMessageTimestamp } from 'lib/shared/message-utils.js'; import { isStaff } from 'lib/shared/staff-utils.js'; import { serverRequestSocketTimeout, serverResponseTimeout, } from 'lib/shared/timeouts.js'; import { mostRecentUpdateTimestamp } from 'lib/shared/update-utils.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { Shape } from 'lib/types/core.js'; import { endpointIsSocketSafe } from 'lib/types/endpoints.js'; import type { RawEntryInfo } from 'lib/types/entry-types.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import { redisMessageTypes, type RedisMessage } from 'lib/types/redis-types.js'; import { serverRequestTypes } from 'lib/types/request-types.js'; import { sessionCheckFrequency, stateCheckInactivityActivationInterval, } from 'lib/types/session-types.js'; import { type ClientSocketMessage, type InitialClientSocketMessage, type ResponsesClientSocketMessage, type ServerStateSyncFullSocketPayload, type ServerServerSocketMessage, type ErrorServerSocketMessage, type AuthErrorServerSocketMessage, type PingClientSocketMessage, type AckUpdatesClientSocketMessage, type APIRequestClientSocketMessage, clientSocketMessageTypes, stateSyncPayloadTypes, serverSocketMessageTypes, serverServerSocketMessageValidator, } from 'lib/types/socket-types.js'; import type { RawThreadInfos } from 'lib/types/thread-types.js'; import type { UserInfo, CurrentUserInfo } from 'lib/types/user-types.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import SequentialPromiseResolver from 'lib/utils/sequential-promise-resolver.js'; import sleep from 'lib/utils/sleep.js'; import { tShape, tCookie } from 'lib/utils/validation-utils.js'; import { RedisSubscriber } from './redis.js'; import { clientResponseInputValidator, processClientResponses, initializeSession, checkState, } from './session-utils.js'; import { fetchUpdateInfosWithRawUpdateInfos } from '../creators/update-creator.js'; import { deleteActivityForViewerSession } from '../deleters/activity-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { deleteUpdatesBeforeTimeTargetingSession } from '../deleters/update-deleters.js'; import { jsonEndpoints } from '../endpoints.js'; import { fetchMessageInfosSince, getMessageFetchResultFromRedisMessages, } from '../fetchers/message-fetchers.js'; import { fetchUpdateInfos } from '../fetchers/update-fetchers.js'; import { newEntryQueryInputValidator, verifyCalendarQueryThreadIDs, } from '../responders/entry-responders.js'; import { handleAsyncPromise } from '../responders/handlers.js'; import { fetchViewerForSocket, updateCookie, isCookieMissingSignedIdentityKeysBlob, isCookieMissingOlmNotificationsSession, createNewAnonymousCookie, } from '../session/cookies.js'; import { Viewer } from '../session/viewer.js'; import type { AnonymousViewerData } from '../session/viewer.js'; import { serverStateSyncSpecs } from '../shared/state-sync/state-sync-specs.js'; import { commitSessionUpdate } from '../updaters/session-updaters.js'; import { compressMessage } from '../utils/compress.js'; import { assertSecureRequest } from '../utils/security-utils.js'; import { checkInputValidator, checkClientSupported, policiesValidator, validateOutput, } from '../utils/validation-utils.js'; const clientSocketMessageInputValidator: TUnion = t.union([ tShape({ type: t.irreducible( 'clientSocketMessageTypes.INITIAL', x => x === clientSocketMessageTypes.INITIAL, ), id: t.Number, payload: tShape({ sessionIdentification: tShape({ cookie: t.maybe(tCookie), sessionID: t.maybe(t.String), }), sessionState: tShape({ calendarQuery: newEntryQueryInputValidator, messagesCurrentAsOf: t.Number, updatesCurrentAsOf: t.Number, watchedIDs: t.list(t.String), }), clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.RESPONSES', x => x === clientSocketMessageTypes.RESPONSES, ), id: t.Number, payload: tShape({ clientResponses: t.list(clientResponseInputValidator), }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.PING', x => x === clientSocketMessageTypes.PING, ), id: t.Number, }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.ACK_UPDATES', x => x === clientSocketMessageTypes.ACK_UPDATES, ), id: t.Number, payload: tShape({ currentAsOf: t.Number, }), }), tShape({ type: t.irreducible( 'clientSocketMessageTypes.API_REQUEST', x => x === clientSocketMessageTypes.API_REQUEST, ), id: t.Number, payload: tShape({ endpoint: t.String, input: t.maybe(t.Object), }), }), ]); function onConnection(ws: WebSocket, req: $Request) { assertSecureRequest(req); new Socket(ws, req); } type StateCheckConditions = { activityRecentlyOccurred: boolean, stateCheckOngoing: boolean, }; const minVersionsForCompression = { native: 265, web: 30, }; class Socket { ws: WebSocket; httpRequest: $Request; viewer: ?Viewer; redis: ?RedisSubscriber; redisPromiseResolver: SequentialPromiseResolver; stateCheckConditions: StateCheckConditions = { activityRecentlyOccurred: true, stateCheckOngoing: false, }; stateCheckTimeoutID: ?TimeoutID; constructor(ws: WebSocket, httpRequest: $Request) { this.ws = ws; this.httpRequest = httpRequest; ws.on('message', this.onMessage); ws.on('close', this.onClose); this.resetTimeout(); this.redisPromiseResolver = new SequentialPromiseResolver(this.sendMessage); } onMessage = async ( messageString: string | Buffer | ArrayBuffer | Array, - ) => { + ): Promise => { invariant(typeof messageString === 'string', 'message should be string'); let clientSocketMessage: ?ClientSocketMessage; try { this.resetTimeout(); const messageObject = JSON.parse(messageString); clientSocketMessage = checkInputValidator( clientSocketMessageInputValidator, messageObject, ); if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { if (this.viewer) { // This indicates that the user sent multiple INITIAL messages. throw new ServerError('socket_already_initialized'); } this.viewer = await fetchViewerForSocket( this.httpRequest, clientSocketMessage, ); } const { viewer } = this; if (!viewer) { // This indicates a non-INITIAL message was sent by the client before // the INITIAL message. throw new ServerError('socket_uninitialized'); } if (viewer.sessionChanged) { // This indicates that the cookie was invalid, and we've assigned a new // anonymous one. throw new ServerError('socket_deauthorized'); } if (!viewer.loggedIn) { // This indicates that the specified cookie was an anonymous one. throw new ServerError('not_logged_in'); } await checkClientSupported( viewer, clientSocketMessageInputValidator, clientSocketMessage, ); await policiesValidator(viewer, baseLegalPolicies); const serverResponses = await this.handleClientSocketMessage( clientSocketMessage, ); if (!this.redis) { this.redis = new RedisSubscriber( { userID: viewer.userID, sessionID: viewer.session }, this.onRedisMessage, ); } if (viewer.sessionChanged) { // This indicates that something has caused the session to change, which // shouldn't happen from inside a WebSocket since we can't handle cookie // invalidation. throw new ServerError('session_mutated_from_socket'); } if (clientSocketMessage.type !== clientSocketMessageTypes.PING) { handleAsyncPromise(updateCookie(viewer)); } for (const response of serverResponses) { // Normally it's an anti-pattern to await in sequence like this. But in // this case, we have a requirement that this array of serverResponses // is delivered in order. See here: // https://github.com/CommE2E/comm/blob/101eb34481deb49c609bfd2c785f375886e52666/keyserver/src/socket/socket.js#L566-L568 await this.sendMessage(response); } if (clientSocketMessage.type === clientSocketMessageTypes.INITIAL) { this.onSuccessfulConnection(); } } catch (error) { console.warn(error); if (!(error instanceof ServerError)) { const errorMessage: ErrorServerSocketMessage = { type: serverSocketMessageTypes.ERROR, message: error.message, }; const responseTo = clientSocketMessage ? clientSocketMessage.id : null; if (responseTo !== null) { errorMessage.responseTo = responseTo; } this.markActivityOccurred(); await this.sendMessage(errorMessage); return; } invariant(clientSocketMessage, 'should be set'); const responseTo = clientSocketMessage.id; if (error.message === 'socket_deauthorized') { invariant(this.viewer, 'should be set'); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: this.viewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4100, error.message); return; } else if (error.message === 'client_version_unsupported') { const { viewer } = this; invariant(viewer, 'should be set'); const anonymousViewerDataPromise: Promise = createNewAnonymousCookie({ platformDetails: error.platformDetails, deviceToken: viewer.deviceToken, }); const deleteCookiePromise = deleteCookie(viewer.cookieID); const [anonymousViewerData] = await Promise.all([ anonymousViewerDataPromise, deleteCookiePromise, ]); // It is normally not safe to pass the result of // createNewAnonymousCookie to the Viewer constructor. That is because // createNewAnonymousCookie leaves several fields of // AnonymousViewerData unset, and consequently Viewer will throw when // access is attempted. It is only safe here because we can guarantee // that only cookiePairString and cookieID are accessed on anonViewer // below. const anonViewer = new Viewer(anonymousViewerData); const authErrorMessage: AuthErrorServerSocketMessage = { type: serverSocketMessageTypes.AUTH_ERROR, responseTo, message: error.message, sessionChange: { cookie: anonViewer.cookiePairString, currentUserInfo: { anonymous: true, }, }, }; await this.sendMessage(authErrorMessage); this.ws.close(4101, error.message); return; } if (error.payload) { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, payload: error.payload, }); } else { await this.sendMessage({ type: serverSocketMessageTypes.ERROR, responseTo, message: error.message, }); } if (error.message === 'not_logged_in') { this.ws.close(4102, error.message); } else if (error.message === 'session_mutated_from_socket') { this.ws.close(4103, error.message); } else { this.markActivityOccurred(); } } }; onClose = async () => { this.clearStateCheckTimeout(); this.resetTimeout.cancel(); this.debouncedAfterActivity.cancel(); if (this.viewer && this.viewer.hasSessionInfo) { await deleteActivityForViewerSession(this.viewer); } if (this.redis) { this.redis.quit(); this.redis = null; } }; sendMessage = async (message: ServerServerSocketMessage) => { invariant( this.ws.readyState > 0, "shouldn't send message until connection established", ); if (this.ws.readyState !== 1) { return; } const { viewer } = this; const validatedMessage = validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, message, ); const stringMessage = JSON.stringify(validatedMessage); if ( !viewer?.platformDetails || !hasMinCodeVersion(viewer.platformDetails, minVersionsForCompression) || !isStaff(viewer.id) ) { this.ws.send(stringMessage); return; } const compressionResult = await compressMessage(stringMessage); if (this.ws.readyState !== 1) { return; } if (!compressionResult.compressed) { this.ws.send(stringMessage); return; } const compressedMessage = { type: serverSocketMessageTypes.COMPRESSED_MESSAGE, payload: compressionResult.result, }; const validatedCompressedMessage = validateOutput( viewer?.platformDetails, serverServerSocketMessageValidator, compressedMessage, ); const stringCompressedMessage = JSON.stringify(validatedCompressedMessage); this.ws.send(stringCompressedMessage); }; async handleClientSocketMessage( message: ClientSocketMessage, ): Promise { const resultPromise: Promise = (async () => { if (message.type === clientSocketMessageTypes.INITIAL) { this.markActivityOccurred(); return await this.handleInitialClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.RESPONSES) { this.markActivityOccurred(); return await this.handleResponsesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.PING) { return this.handlePingClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.ACK_UPDATES) { this.markActivityOccurred(); return await this.handleAckUpdatesClientSocketMessage(message); } else if (message.type === clientSocketMessageTypes.API_REQUEST) { this.markActivityOccurred(); return await this.handleAPIRequestClientSocketMessage(message); } return []; })(); const timeoutPromise: Promise = (async () => { await sleep(serverResponseTimeout); throw new ServerError('socket_response_timeout'); })(); return await Promise.race([resultPromise, timeoutPromise]); } async handleInitialClientSocketMessage( message: InitialClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const responses: Array = []; const { sessionState, clientResponses } = message.payload; const { calendarQuery, updatesCurrentAsOf: oldUpdatesCurrentAsOf, messagesCurrentAsOf: oldMessagesCurrentAsOf, watchedIDs, } = sessionState; await verifyCalendarQueryThreadIDs(calendarQuery); const sessionInitializationResult = await initializeSession( viewer, calendarQuery, oldUpdatesCurrentAsOf, ); const threadCursors: { [string]: null } = {}; for (const watchedThreadID of watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true, newerThan: oldMessagesCurrentAsOf, }; const [fetchMessagesResult, { serverRequests, activityUpdateResult }] = await Promise.all([ fetchMessageInfosSince( viewer, messageSelectionCriteria, defaultNumberPerThread, ), processClientResponses(viewer, clientResponses), ]); const messagesResult = { rawMessageInfos: fetchMessagesResult.rawMessageInfos, truncationStatuses: fetchMessagesResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( fetchMessagesResult.rawMessageInfos, oldMessagesCurrentAsOf, ), }; const isCookieMissingSignedIdentityKeysBlobPromise = isCookieMissingSignedIdentityKeysBlob(viewer.cookieID); const isCookieMissingOlmNotificationsSessionPromise = isCookieMissingOlmNotificationsSession(viewer); if (!sessionInitializationResult.sessionContinued) { const promises: { +[string]: Promise } = Object.fromEntries( values(serverStateSyncSpecs).map(spec => [ spec.hashKey, spec.fetchFullSocketSyncPayload(viewer, [calendarQuery]), ]), ); // We have a type error here because Flow doesn't know spec.hashKey const castPromises: { +threadInfos: Promise, +currentUserInfo: Promise, +entryInfos: Promise<$ReadOnlyArray>, +userInfos: Promise<$ReadOnlyArray>, } = (promises: any); const results = await promiseAll(castPromises); const payload: ServerStateSyncFullSocketPayload = { type: stateSyncPayloadTypes.FULL, messagesResult, threadInfos: results.threadInfos, currentUserInfo: results.currentUserInfo, rawEntryInfos: results.entryInfos, userInfos: results.userInfos, updatesCurrentAsOf: oldUpdatesCurrentAsOf, }; if (viewer.sessionChanged) { // If initializeSession encounters, // sessionIdentifierTypes.BODY_SESSION_ID but the session // is unspecified or expired, // it will set a new sessionID and specify viewer.sessionChanged const { sessionID } = viewer; invariant( sessionID !== null && sessionID !== undefined, 'should be set', ); payload.sessionID = sessionID; viewer.sessionChanged = false; } responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload, }); } else { const { sessionUpdate, deltaEntryInfoResult } = sessionInitializationResult; const deleteExpiredUpdatesPromise = deleteUpdatesBeforeTimeTargetingSession(viewer, oldUpdatesCurrentAsOf); const fetchUpdateResultPromise = fetchUpdateInfos( viewer, oldUpdatesCurrentAsOf, calendarQuery, ); const sessionUpdatePromise = commitSessionUpdate(viewer, sessionUpdate); const [fetchUpdateResult] = await Promise.all([ fetchUpdateResultPromise, deleteExpiredUpdatesPromise, sessionUpdatePromise, ]); const { updateInfos, userInfos } = fetchUpdateResult; const newUpdatesCurrentAsOf = mostRecentUpdateTimestamp( [...updateInfos], oldUpdatesCurrentAsOf, ); const updatesResult = { newUpdates: updateInfos, currentAsOf: newUpdatesCurrentAsOf, }; responses.push({ type: serverSocketMessageTypes.STATE_SYNC, responseTo: message.id, payload: { type: stateSyncPayloadTypes.INCREMENTAL, messagesResult, updatesResult, deltaEntryInfos: deltaEntryInfoResult.rawEntryInfos, deletedEntryIDs: deltaEntryInfoResult.deletedEntryIDs, userInfos: values(userInfos), }, }); } const [signedIdentityKeysBlobMissing, olmNotificationsSessionMissing] = await Promise.all([ isCookieMissingSignedIdentityKeysBlobPromise, isCookieMissingOlmNotificationsSessionPromise, ]); if (signedIdentityKeysBlobMissing) { serverRequests.push({ type: serverRequestTypes.SIGNED_IDENTITY_KEYS_BLOB, }); } if (olmNotificationsSessionMissing) { serverRequests.push({ type: serverRequestTypes.INITIAL_NOTIFICATIONS_ENCRYPTED_MESSAGE, }); } if (serverRequests.length > 0 || clientResponses.length > 0) { // We send this message first since the STATE_SYNC triggers the client's // connection status to shift to "connected", and we want to make sure the // client responses are cleared from Redux before that happens responses.unshift({ type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }); } if (activityUpdateResult) { // Same reason for unshifting as above responses.unshift({ type: serverSocketMessageTypes.ACTIVITY_UPDATE_RESPONSE, responseTo: message.id, payload: activityUpdateResult, }); } return responses; } async handleResponsesClientSocketMessage( message: ResponsesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { clientResponses } = message.payload; const { stateCheckStatus } = await processClientResponses( viewer, clientResponses, ); const serverRequests = []; if (stateCheckStatus && stateCheckStatus.status !== 'state_check') { const { sessionUpdate, checkStateRequest } = await checkState( viewer, stateCheckStatus, ); if (sessionUpdate) { await commitSessionUpdate(viewer, sessionUpdate); this.setStateCheckConditions({ stateCheckOngoing: false }); } if (checkStateRequest) { serverRequests.push(checkStateRequest); } } // We send a response message regardless of whether we have any requests, // since we need to ack the client's responses return [ { type: serverSocketMessageTypes.REQUESTS, responseTo: message.id, payload: { serverRequests }, }, ]; } handlePingClientSocketMessage( message: PingClientSocketMessage, ): ServerServerSocketMessage[] { return [ { type: serverSocketMessageTypes.PONG, responseTo: message.id, }, ]; } async handleAckUpdatesClientSocketMessage( message: AckUpdatesClientSocketMessage, ): Promise { const { viewer } = this; invariant(viewer, 'should be set'); const { currentAsOf } = message.payload; await Promise.all([ deleteUpdatesBeforeTimeTargetingSession(viewer, currentAsOf), commitSessionUpdate(viewer, { lastUpdate: currentAsOf }), ]); return []; } async handleAPIRequestClientSocketMessage( message: APIRequestClientSocketMessage, ): Promise { if (!endpointIsSocketSafe(message.payload.endpoint)) { throw new ServerError('endpoint_unsafe_for_socket'); } const { viewer } = this; invariant(viewer, 'should be set'); const responder = jsonEndpoints[message.payload.endpoint]; await policiesValidator(viewer, responder.requiredPolicies); const response = await responder.responder(viewer, message.payload.input); return [ { type: serverSocketMessageTypes.API_RESPONSE, responseTo: message.id, payload: response, }, ]; } onRedisMessage = async (message: RedisMessage) => { try { await this.processRedisMessage(message); } catch (e) { console.warn(e); } }; async processRedisMessage(message: RedisMessage) { if (message.type === redisMessageTypes.START_SUBSCRIPTION) { this.ws.terminate(); } else if (message.type === redisMessageTypes.NEW_UPDATES) { const { viewer } = this; invariant(viewer, 'should be set'); if (message.ignoreSession && message.ignoreSession === viewer.session) { return; } const rawUpdateInfos = message.updates; this.redisPromiseResolver.add( (async () => { const { updateInfos, userInfos } = await fetchUpdateInfosWithRawUpdateInfos(rawUpdateInfos, { viewer, }); if (updateInfos.length === 0) { console.warn( 'could not get any UpdateInfos from redisMessageTypes.NEW_UPDATES', ); return null; } this.markActivityOccurred(); return { type: serverSocketMessageTypes.UPDATES, payload: { updatesResult: { currentAsOf: mostRecentUpdateTimestamp([...updateInfos], 0), newUpdates: updateInfos, }, userInfos: values(userInfos), }, }; })(), ); } else if (message.type === redisMessageTypes.NEW_MESSAGES) { const { viewer } = this; invariant(viewer, 'should be set'); const rawMessageInfos = message.messages; const messageFetchResult = getMessageFetchResultFromRedisMessages( viewer, rawMessageInfos, ); if (messageFetchResult.rawMessageInfos.length === 0) { console.warn( 'could not get any rawMessageInfos from ' + 'redisMessageTypes.NEW_MESSAGES', ); return; } this.redisPromiseResolver.add( (async () => { this.markActivityOccurred(); return { type: serverSocketMessageTypes.MESSAGES, payload: { messagesResult: { rawMessageInfos: messageFetchResult.rawMessageInfos, truncationStatuses: messageFetchResult.truncationStatuses, currentAsOf: mostRecentMessageTimestamp( messageFetchResult.rawMessageInfos, 0, ), }, }, }; })(), ); } } onSuccessfulConnection() { if (this.ws.readyState !== 1) { return; } this.handleStateCheckConditionsUpdate(); } // The Socket will timeout by calling this.ws.terminate() // serverRequestSocketTimeout milliseconds after the last // time resetTimeout is called - resetTimeout = _debounce( + resetTimeout: { +cancel: () => void } & (() => void) = _debounce( () => this.ws.terminate(), serverRequestSocketTimeout, ); - debouncedAfterActivity = _debounce( + debouncedAfterActivity: { +cancel: () => void } & (() => void) = _debounce( () => this.setStateCheckConditions({ activityRecentlyOccurred: false }), stateCheckInactivityActivationInterval, ); markActivityOccurred = () => { if (this.ws.readyState !== 1) { return; } this.setStateCheckConditions({ activityRecentlyOccurred: true }); this.debouncedAfterActivity(); }; clearStateCheckTimeout() { const { stateCheckTimeoutID } = this; if (stateCheckTimeoutID) { clearTimeout(stateCheckTimeoutID); this.stateCheckTimeoutID = null; } } setStateCheckConditions(newConditions: Shape) { this.stateCheckConditions = { ...this.stateCheckConditions, ...newConditions, }; this.handleStateCheckConditionsUpdate(); } - get stateCheckCanStart() { + get stateCheckCanStart(): boolean { return Object.values(this.stateCheckConditions).every(cond => !cond); } handleStateCheckConditionsUpdate() { if (!this.stateCheckCanStart) { this.clearStateCheckTimeout(); return; } if (this.stateCheckTimeoutID) { return; } const { viewer } = this; if (!viewer) { return; } const timeUntilStateCheck = viewer.sessionLastValidated + sessionCheckFrequency - Date.now(); if (timeUntilStateCheck <= 0) { this.initiateStateCheck(); } else { this.stateCheckTimeoutID = setTimeout( this.initiateStateCheck, timeUntilStateCheck, ); } } initiateStateCheck = async () => { this.setStateCheckConditions({ stateCheckOngoing: true }); const { viewer } = this; invariant(viewer, 'should be set'); const { checkStateRequest } = await checkState(viewer, { status: 'state_check', }); invariant(checkStateRequest, 'should be set'); await this.sendMessage({ type: serverSocketMessageTypes.REQUESTS, payload: { serverRequests: [checkStateRequest] }, }); }; } export { onConnection }; diff --git a/keyserver/src/utils/depth-queue.js b/keyserver/src/utils/depth-queue.js index 0d71f3be4..15dcecc78 100644 --- a/keyserver/src/utils/depth-queue.js +++ b/keyserver/src/utils/depth-queue.js @@ -1,74 +1,74 @@ // @flow class DepthQueue { depthQueues: Map> = new Map(); maxEnqueuedDepth: number = -1; maxDequeuedDepth: number = -1; getDepth: (a: T) => number; getKey: (a: T) => string; mergeFunction: (a: T, b: T) => T; constructor( getDepth: (a: T) => number, getKey: (a: T) => string, mergeFunction: (a: T, b: T) => T, ) { this.getDepth = getDepth; this.getKey = getKey; this.mergeFunction = mergeFunction; } - addInfo(info: T) { + addInfo(info: T): void { const depth = this.getDepth(info); if (depth <= this.maxDequeuedDepth) { throw new Error( `try to addInfo at depth ${depth} after having dequeued at depth ` + this.maxDequeuedDepth, ); } let map = this.depthQueues.get(depth); if (!map) { map = new Map(); this.depthQueues.set(depth, map); } const key = this.getKey(info); const curInfo = map.get(key); const mergedInfo = curInfo ? this.mergeFunction(curInfo, info) : info; map.set(key, mergedInfo); if (depth > this.maxEnqueuedDepth) { this.maxEnqueuedDepth = depth; } } addInfos(infos: $ReadOnlyArray) { for (const info of infos) { this.addInfo(info); } } getNextDepth(): ?(T[]) { let i = this.maxDequeuedDepth + 1; while (i <= this.maxEnqueuedDepth) { const depthMap = this.depthQueues.get(i++); if (!depthMap) { continue; } this.maxDequeuedDepth = i - 1; const result = [...depthMap.values()]; this.depthQueues.delete(i - 1); return result; } return undefined; } empty(): boolean { for (let i = this.maxDequeuedDepth + 1; i <= this.maxEnqueuedDepth; i++) { if (this.depthQueues.has(i)) { return false; } } return true; } } export default DepthQueue; diff --git a/keyserver/src/utils/olm-utils.test.js b/keyserver/src/utils/olm-utils.test.js index 141e34c50..1eabe968f 100644 --- a/keyserver/src/utils/olm-utils.test.js +++ b/keyserver/src/utils/olm-utils.test.js @@ -1,218 +1,223 @@ // @flow import olm from '@commapp/olm'; import { getOlmUtility } from '../utils/olm-utils.js'; describe('olm.Account', () => { const alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789 '; - const randomString = length => + const randomString = (length: number) => Array.from( { length }, () => alphabet[Math.floor(Math.random() * alphabet.length)], ).join(''); - const initAccount = (mark_prekey_published = true) => { + const initAccount = (mark_prekey_published: boolean = true) => { const account = new olm.Account(); account.create(); account.generate_prekey(); account.generate_one_time_keys(1); if (mark_prekey_published) { account.mark_prekey_as_published(); } return account; }; const createSession = ( - aliceSession, - aliceAccount, - bobAccount, - regen = false, - forget = false, - invalid_sign = false, + aliceSession: olm.Session, + aliceAccount: olm.Account, + bobAccount: olm.Account, + regen: boolean = false, + forget: boolean = false, + invalid_sign: boolean = false, ) => { const bobOneTimeKeys = JSON.parse(bobAccount.one_time_keys()).curve25519; bobAccount.mark_keys_as_published(); const otk_id = Object.keys(bobOneTimeKeys)[0]; if (regen) { bobAccount.generate_prekey(); if (forget) { bobAccount.forget_old_prekey(); } } if (invalid_sign) { try { aliceSession.create_outbound( aliceAccount, JSON.parse(bobAccount.identity_keys()).curve25519, JSON.parse(bobAccount.identity_keys()).ed25519, String(Object.values(JSON.parse(bobAccount.prekey()).curve25519)[0]), bobAccount.sign(randomString(32)), bobOneTimeKeys[otk_id], ); } catch (error) { expect(error.message).toBe('OLM.BAD_SIGNATURE'); return false; } try { aliceSession.create_outbound( aliceAccount, JSON.parse(bobAccount.identity_keys()).curve25519, JSON.parse(bobAccount.identity_keys()).ed25519, String(Object.values(JSON.parse(bobAccount.prekey()).curve25519)[0]), randomString(43), bobOneTimeKeys[otk_id], ); } catch (error) { expect(error.message).toBe('OLM.INVALID_BASE64'); return false; } } aliceSession.create_outbound( aliceAccount, JSON.parse(bobAccount.identity_keys()).curve25519, JSON.parse(bobAccount.identity_keys()).ed25519, String(Object.values(JSON.parse(bobAccount.prekey()).curve25519)[0]), String(bobAccount.prekey_signature()), bobOneTimeKeys[otk_id], ); return aliceSession; }; - const testRatchet = (aliceSession, bobSession, bobAccount, num_msg = 1) => { + const testRatchet = ( + aliceSession: olm.Session, + bobSession: olm.Session, + bobAccount: olm.Account, + num_msg: number = 1, + ) => { let test_text = randomString(40); let encrypted = aliceSession.encrypt(test_text); expect(encrypted.type).toEqual(0); try { bobSession.create_inbound(bobAccount, encrypted.body); } catch (error) { expect(error.message).toBe('OLM.BAD_MESSAGE_KEY_ID'); return false; } bobAccount.remove_one_time_keys(bobSession); let decrypted = bobSession.decrypt(encrypted.type, encrypted.body); expect(decrypted).toEqual(test_text); test_text = randomString(40); encrypted = bobSession.encrypt(test_text); expect(encrypted.type).toEqual(1); decrypted = aliceSession.decrypt(encrypted.type, encrypted.body); expect(decrypted).toEqual(test_text); for (let index = 1; index < num_msg; index++) { test_text = randomString(40); encrypted = aliceSession.encrypt(test_text); expect(encrypted.type).toEqual(1); decrypted = bobSession.decrypt(encrypted.type, encrypted.body); expect(decrypted).toEqual(test_text); test_text = randomString(40); encrypted = bobSession.encrypt(test_text); expect(encrypted.type).toEqual(1); decrypted = aliceSession.decrypt(encrypted.type, encrypted.body); expect(decrypted).toEqual(test_text); } return true; }; it('should get Olm Utility', async () => { await olm.init(); const utility = getOlmUtility(); expect(utility).toBeDefined(); }); it('should generate, regenerate, forget, and publish prekey', async () => { await olm.init(); const account = initAccount(false); expect(account.last_prekey_publish_time()).toEqual(0); expect(account.prekey()).toBeDefined(); expect(account.unpublished_prekey()).toBeDefined(); account.mark_prekey_as_published(); const last_published = account.last_prekey_publish_time(); expect(last_published).toBeGreaterThan(0); try { console.log(account.unpublished_prekey()); } catch (error) { expect(error.message).toContain('NO_UNPUBLISHED_PREKEY'); } account.forget_old_prekey(); account.generate_prekey(); expect(account.prekey()).toBeDefined(); expect(account.unpublished_prekey()).toBeDefined(); expect(account.last_prekey_publish_time()).toEqual(last_published); account.mark_prekey_as_published(); expect(account.last_prekey_publish_time()).toBeGreaterThanOrEqual( last_published, ); account.forget_old_prekey(); }); it('should encrypt and decrypt', async () => { await olm.init(); const aliceAccount = initAccount(); const bobAccount = initAccount(); const aliceSession = new olm.Session(); const bobSession = new olm.Session(); createSession(aliceSession, aliceAccount, bobAccount); expect(testRatchet(aliceSession, bobSession, bobAccount)).toBeTrue; }); it('should encrypt and decrypt, even after a prekey is rotated', async () => { await olm.init(); const aliceAccount = initAccount(); const bobAccount = initAccount(); const aliceSession = new olm.Session(); const bobSession = new olm.Session(); createSession(aliceSession, aliceAccount, bobAccount, true); expect(testRatchet(aliceSession, bobSession, bobAccount)).toBeTrue; }); it('should not encrypt and decrypt, after the old prekey is forgotten', async () => { await olm.init(); const aliceAccount = initAccount(); const bobAccount = initAccount(); const aliceSession = new olm.Session(); const bobSession = new olm.Session(); createSession(aliceSession, aliceAccount, bobAccount, true, true); expect(testRatchet(aliceSession, bobSession, bobAccount)).toBeFalse; }); it('should encrypt and decrypt repeatedly', async () => { await olm.init(); const aliceAccount = initAccount(); const bobAccount = initAccount(); const aliceSession = new olm.Session(); const bobSession = new olm.Session(); createSession(aliceSession, aliceAccount, bobAccount, false, false); expect(testRatchet(aliceSession, bobSession, bobAccount, 100)).toBeTrue; }); it('should not encrypt and decrypt if prekey is not signed correctly', async () => { await olm.init(); const aliceAccount = initAccount(); const bobAccount = initAccount(); const aliceSession = new olm.Session(); expect( createSession(aliceSession, aliceAccount, bobAccount, false, false, true), ).toBeFalse; }); });