Changeset View
Changeset View
Standalone View
Standalone View
keyserver/src/creators/message-creator.js
Show All 32 Lines | import { | ||||
mergeOrConditions, | mergeOrConditions, | ||||
} from '../database/database.js'; | } from '../database/database.js'; | ||||
import { | import { | ||||
fetchMessageInfoForLocalID, | fetchMessageInfoForLocalID, | ||||
fetchMessageInfoByID, | fetchMessageInfoByID, | ||||
} from '../fetchers/message-fetchers.js'; | } from '../fetchers/message-fetchers.js'; | ||||
import { fetchOtherSessionsForViewer } from '../fetchers/session-fetchers.js'; | import { fetchOtherSessionsForViewer } from '../fetchers/session-fetchers.js'; | ||||
import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; | import { fetchServerThreadInfos } from '../fetchers/thread-fetchers.js'; | ||||
import { sendPushNotifs } from '../push/send.js'; | import { sendPushNotifs, sendRescindNotifs } from '../push/send.js'; | ||||
import { handleAsyncPromise } from '../responders/handlers.js'; | import { handleAsyncPromise } from '../responders/handlers.js'; | ||||
import type { Viewer } from '../session/viewer.js'; | import type { Viewer } from '../session/viewer.js'; | ||||
import { earliestFocusedTimeConsideredExpired } from '../shared/focused-times.js'; | import { earliestFocusedTimeConsideredExpired } from '../shared/focused-times.js'; | ||||
import { publisher } from '../socket/redis.js'; | import { publisher } from '../socket/redis.js'; | ||||
import { creationString } from '../utils/idempotent.js'; | import { creationString } from '../utils/idempotent.js'; | ||||
type UserThreadInfo = { | type UserThreadInfo = { | ||||
+devices: Map< | +devices: Map< | ||||
▲ Show 20 Lines • Show All 93 Lines • ▼ Show 20 Lines | for (let i = 0; i < messageDatas.length; i++) { | ||||
messageIndices.push(newMessageIndex); | messageIndices.push(newMessageIndex); | ||||
const serverID = ids[newMessageIndex]; | const serverID = ids[newMessageIndex]; | ||||
if (messageData.type === messageTypes.CREATE_SUB_THREAD) { | if (messageData.type === messageTypes.CREATE_SUB_THREAD) { | ||||
subthreadPermissionsToCheck.add(messageData.childThreadID); | subthreadPermissionsToCheck.add(messageData.childThreadID); | ||||
} | } | ||||
const content = | const content = messageSpecs[messageData.type].messageContentForServerDB?.( | ||||
messageSpecs[messageData.type].messageContentForServerDB?.(messageData); | messageData, | ||||
); | |||||
const creation = | const creation = | ||||
messageData.localID && viewer.hasSessionInfo | messageData.localID && viewer.hasSessionInfo | ||||
? creationString(viewer, messageData.localID) | ? creationString(viewer, messageData.localID) | ||||
: null; | : null; | ||||
const targetMessageID = messageData.targetMessageID | const targetMessageID = messageData.targetMessageID | ||||
? messageData.targetMessageID | ? messageData.targetMessageID | ||||
▲ Show 20 Lines • Show All 227 Lines • ▼ Show 20 Lines | for (const row of result) { | ||||
if (!focusedUser) { | if (!focusedUser) { | ||||
thisUserInfo.notFocusedThreadIDs.add(threadID); | thisUserInfo.notFocusedThreadIDs.add(threadID); | ||||
} | } | ||||
} | } | ||||
const messageInfosPerUser = {}; | const messageInfosPerUser = {}; | ||||
const latestMessagesPerUser: LatestMessagesPerUser = new Map(); | const latestMessagesPerUser: LatestMessagesPerUser = new Map(); | ||||
const userPushInfoPromises = {}; | const userPushInfoPromises = {}; | ||||
const userRescindInfoPromises = {}; | |||||
for (const pair of perUserInfo) { | for (const pair of perUserInfo) { | ||||
const [userID, preUserPushInfo] = pair; | const [userID, preUserPushInfo] = pair; | ||||
const userMessageInfos = []; | const userMessageInfos = []; | ||||
for (const threadID of preUserPushInfo.threadIDs) { | for (const threadID of preUserPushInfo.threadIDs) { | ||||
const messageIndices = threadsToMessageIndices.get(threadID); | const messageIndices = threadsToMessageIndices.get(threadID); | ||||
invariant(messageIndices, `indices should exist for thread ${threadID}`); | invariant(messageIndices, `indices should exist for thread ${threadID}`); | ||||
for (const messageIndex of messageIndices) { | for (const messageIndex of messageIndices) { | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | const generateNotifUserInfoPromise = (pushType: PushType) => { | ||||
return { | return { | ||||
devices: userDevices, | devices: userDevices, | ||||
messageInfos: filteredNotifMessageInfos, | messageInfos: filteredNotifMessageInfos, | ||||
}; | }; | ||||
})(); | })(); | ||||
}; | }; | ||||
const userPushInfoPromise = generateNotifUserInfoPromise(pushTypes.NOTIF); | const userPushInfoPromise = generateNotifUserInfoPromise(pushTypes.NOTIF); | ||||
const userRescindInfoPromise = generateNotifUserInfoPromise( | |||||
pushTypes.RESCIND, | |||||
); | |||||
userPushInfoPromises[userID] = userPushInfoPromise; | userPushInfoPromises[userID] = userPushInfoPromise; | ||||
userRescindInfoPromises[userID] = userRescindInfoPromise; | |||||
} | } | ||||
const latestMessages = flattenLatestMessagesPerUser(latestMessagesPerUser); | const latestMessages = flattenLatestMessagesPerUser(latestMessagesPerUser); | ||||
const [pushInfo] = await Promise.all([ | const [pushInfo, rescindInfo] = await Promise.all([ | ||||
promiseAll(userPushInfoPromises), | promiseAll(userPushInfoPromises), | ||||
promiseAll(userRescindInfoPromises), | |||||
createReadStatusUpdates(latestMessages), | createReadStatusUpdates(latestMessages), | ||||
redisPublish(viewer, messageInfosPerUser, updatesForCurrentSession), | redisPublish(viewer, messageInfosPerUser, updatesForCurrentSession), | ||||
updateLatestMessages(latestMessages), | updateLatestMessages(latestMessages), | ||||
]); | ]); | ||||
await sendPushNotifs(_pickBy(Boolean)(pushInfo)); | await sendPushNotifs(_pickBy(Boolean)(pushInfo)); | ||||
await sendRescindNotifs(_pickBy(Boolean)(rescindInfo)); | |||||
} | } | ||||
async function redisPublish( | async function redisPublish( | ||||
viewer: Viewer, | viewer: Viewer, | ||||
messageInfosPerUser: { [userID: string]: $ReadOnlyArray<RawMessageInfo> }, | messageInfosPerUser: { [userID: string]: $ReadOnlyArray<RawMessageInfo> }, | ||||
updatesForCurrentSession: UpdatesForCurrentSession, | updatesForCurrentSession: UpdatesForCurrentSession, | ||||
) { | ) { | ||||
const avoidBroadcastingToCurrentSession = | const avoidBroadcastingToCurrentSession = | ||||
Show All 28 Lines | |||||
} | } | ||||
function determineLatestMessagesPerThread( | function determineLatestMessagesPerThread( | ||||
preUserPushInfo: UserThreadInfo, | preUserPushInfo: UserThreadInfo, | ||||
userID: string, | userID: string, | ||||
threadsToMessageIndices: $ReadOnlyMap<string, $ReadOnlyArray<number>>, | threadsToMessageIndices: $ReadOnlyMap<string, $ReadOnlyArray<number>>, | ||||
messageInfos: $ReadOnlyArray<RawMessageInfo>, | messageInfos: $ReadOnlyArray<RawMessageInfo>, | ||||
) { | ) { | ||||
const { threadIDs, notFocusedThreadIDs, subthreadsCanSetToUnread } = | const { | ||||
preUserPushInfo; | threadIDs, | ||||
notFocusedThreadIDs, | |||||
subthreadsCanSetToUnread, | |||||
} = preUserPushInfo; | |||||
const latestMessagesPerThread = new Map(); | const latestMessagesPerThread = new Map(); | ||||
for (const threadID of threadIDs) { | for (const threadID of threadIDs) { | ||||
const messageIndices = threadsToMessageIndices.get(threadID); | const messageIndices = threadsToMessageIndices.get(threadID); | ||||
invariant(messageIndices, `indices should exist for thread ${threadID}`); | invariant(messageIndices, `indices should exist for thread ${threadID}`); | ||||
for (const messageIndex of messageIndices) { | for (const messageIndex of messageIndices) { | ||||
const messageInfo = messageInfos[messageIndex]; | const messageInfo = messageInfos[messageIndex]; | ||||
if ( | if ( | ||||
messageInfo.type === messageTypes.CREATE_SUB_THREAD && | messageInfo.type === messageTypes.CREATE_SUB_THREAD && | ||||
▲ Show 20 Lines • Show All 122 Lines • Show Last 20 Lines |