diff --git a/lib/shared/dm-ops/change-thread-settings-spec.js b/lib/shared/dm-ops/change-thread-settings-spec.js index 6afbda2c6..192a5f813 100644 --- a/lib/shared/dm-ops/change-thread-settings-spec.js +++ b/lib/shared/dm-ops/change-thread-settings-spec.js @@ -1,215 +1,251 @@ // @flow import invariant from 'invariant'; import uuid from 'uuid'; import type { DMOperationSpec, ProcessDMOperationUtilities, + ProcessingPossibilityCheckResult, } from './dm-op-spec.js'; import type { DMBlobOperation, DMChangeThreadSettingsOperation, DMThreadSettingsChanges, } from '../../types/dm-ops.js'; import type { RawMessageInfo } from '../../types/message-types'; import { messageTypes } from '../../types/message-types-enum.js'; import type { ChangeSettingsMessageData } from '../../types/messages/change-settings.js'; import type { RawThreadInfo, ThickRawThreadInfo, } from '../../types/minimally-encoded-thread-permissions-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ClientUpdateInfo } from '../../types/update-types.js'; import { blobHashFromBlobServiceURI } from '../../utils/blob-service.js'; import { values } from '../../utils/objects.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; function getThreadIDFromChangeThreadSettingsDMOp( dmOperation: DMChangeThreadSettingsOperation, ): string { return dmOperation.type === 'change_thread_settings' ? dmOperation.threadID : dmOperation.existingThreadDetails.threadID; } function createChangeSettingsMessageDatasAndUpdate( dmOperation: DMChangeThreadSettingsOperation, ): { +fieldNameToMessageData: { +[fieldName: string]: { +messageData: ChangeSettingsMessageData, +rawMessageInfo: RawMessageInfo, }, }, +threadInfoUpdate: DMThreadSettingsChanges, } { const { changes, editorID, time, messageIDsPrefix } = dmOperation; const { name, description, color, avatar } = changes; const threadID = getThreadIDFromChangeThreadSettingsDMOp(dmOperation); const threadInfoUpdate: { ...DMThreadSettingsChanges } = {}; if (name !== undefined && name !== null) { threadInfoUpdate.name = name; } if (description !== undefined && description !== null) { threadInfoUpdate.description = description; } if (color) { threadInfoUpdate.color = color; } if (avatar || avatar === null) { threadInfoUpdate.avatar = avatar; } const fieldNameToMessageData: { [fieldName: string]: { +messageData: ChangeSettingsMessageData, +rawMessageInfo: RawMessageInfo, }, } = {}; const { avatar: avatarObject, ...rest } = threadInfoUpdate; let normalizedThreadInfoUpdate; if (avatarObject) { normalizedThreadInfoUpdate = { ...rest, avatar: JSON.stringify(avatarObject), }; } else if (avatarObject === null) { // clear thread avatar normalizedThreadInfoUpdate = { ...rest, avatar: '' }; } else { normalizedThreadInfoUpdate = { ...rest }; } for (const fieldName in normalizedThreadInfoUpdate) { const value = normalizedThreadInfoUpdate[fieldName]; const messageData: ChangeSettingsMessageData = { type: messageTypes.CHANGE_SETTINGS, threadID, creatorID: editorID, time, field: fieldName, value: value, }; const rawMessageInfo = rawMessageInfoFromMessageData( messageData, `${messageIDsPrefix}/${fieldName}`, ); fieldNameToMessageData[fieldName] = { messageData, rawMessageInfo }; } return { fieldNameToMessageData, threadInfoUpdate }; } function getBlobOpsFromOperation( dmOperation: DMChangeThreadSettingsOperation, threadInfo: ?RawThreadInfo, ): Array { const ops: Array = []; const prevAvatar = threadInfo?.avatar; if (prevAvatar && prevAvatar.type === 'encrypted_image') { ops.push({ type: 'remove_holder', blobHash: blobHashFromBlobServiceURI(prevAvatar.blobURI), dmOpType: 'inbound_and_outbound', }); } const { avatar } = dmOperation.changes; if (avatar && avatar?.type === 'encrypted_image') { ops.push({ type: 'establish_holder', blobHash: blobHashFromBlobServiceURI(avatar.blobURI), dmOpType: 'inbound_only', }); } return ops; } const changeThreadSettingsSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async ( dmOperation: DMChangeThreadSettingsOperation, ) => { const { fieldNameToMessageData } = createChangeSettingsMessageDatasAndUpdate(dmOperation); return { messageDatasWithMessageInfos: values(fieldNameToMessageData) }; }, processDMOperation: async ( dmOperation: DMChangeThreadSettingsOperation, utilities: ProcessDMOperationUtilities, ) => { const { time } = dmOperation; const threadID = getThreadIDFromChangeThreadSettingsDMOp(dmOperation); const threadInfo: ?RawThreadInfo = utilities.threadInfos[threadID]; const updateInfos: Array = []; const { fieldNameToMessageData, threadInfoUpdate } = createChangeSettingsMessageDatasAndUpdate(dmOperation); const blobOps = getBlobOpsFromOperation(dmOperation, threadInfo); const messageDataWithMessageInfoPairs = values(fieldNameToMessageData); const rawMessageInfos = messageDataWithMessageInfoPairs.map( ({ rawMessageInfo }) => rawMessageInfo, ); invariant(threadInfo?.thick, 'Thread should be thick'); let threadInfoToUpdate: ThickRawThreadInfo = threadInfo; for (const fieldName in threadInfoUpdate) { const timestamp = threadInfoToUpdate.timestamps[fieldName]; if (timestamp < time) { threadInfoToUpdate = { ...threadInfoToUpdate, [fieldName]: threadInfoUpdate[fieldName], timestamps: { ...threadInfoToUpdate.timestamps, [fieldName]: time, }, }; } } if (messageDataWithMessageInfoPairs.length > 0) { updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time, threadInfo: threadInfoToUpdate, }); } return { rawMessageInfos, updateInfos, blobOps, }; }, canBeProcessed: async ( dmOperation: DMChangeThreadSettingsOperation, utilities: ProcessDMOperationUtilities, ) => { - if (utilities.threadInfos[dmOperation.threadID]) { + const { fieldNameToMessageData } = + createChangeSettingsMessageDatasAndUpdate(dmOperation); + const messageIDConflictCheckPromises: $ReadOnlyArray< + Promise, + > = values(fieldNameToMessageData).map(async ({ rawMessageInfo }) => { + if (!rawMessageInfo.id) { + return { isProcessingPossible: true }; + } + const messageID = rawMessageInfo.id; + const message = await utilities.fetchMessage(messageID); + if (message) { + console.log( + `Discarded a ${dmOperation.type} operation because ` + + `message with the same ID ${messageID} already exists ` + + 'in the store', + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } return { isProcessingPossible: true }; + }); + + const messageIDConflictCheckResults = await Promise.all( + messageIDConflictCheckPromises, + ); + for (const result of messageIDConflictCheckResults) { + if (!result.isProcessingPossible) { + return result; + } } - return { - isProcessingPossible: false, - reason: { - type: 'missing_thread', - threadID: dmOperation.threadID, - }, - }; + + if (!utilities.threadInfos[dmOperation.threadID]) { + return { + isProcessingPossible: false, + reason: { + type: 'missing_thread', + threadID: dmOperation.threadID, + }, + }; + } + return { isProcessingPossible: true }; }, supportsAutoRetry: true, }); export { changeThreadSettingsSpec, createChangeSettingsMessageDatasAndUpdate }; diff --git a/lib/shared/dm-ops/create-entry-spec.js b/lib/shared/dm-ops/create-entry-spec.js index 6d5a1d59a..2139a9a23 100644 --- a/lib/shared/dm-ops/create-entry-spec.js +++ b/lib/shared/dm-ops/create-entry-spec.js @@ -1,96 +1,108 @@ // @flow import uuid from 'uuid'; import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import type { DMCreateEntryOperation } from '../../types/dm-ops.js'; import type { ThickRawEntryInfo } from '../../types/entry-types.js'; import { messageTypes } from '../../types/message-types-enum.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { EntryUpdateInfo } from '../../types/update-types.js'; import { dateFromString } from '../../utils/date-utils.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; function createMessageDataWithInfoFromDMOperation( dmOperation: DMCreateEntryOperation, ) { const { threadID, creatorID, time, entryID, entryDate, text, messageID } = dmOperation; const messageData = { type: messageTypes.CREATE_ENTRY, threadID, creatorID, time, entryID, date: entryDate, text, }; const rawMessageInfo = rawMessageInfoFromMessageData(messageData, messageID); return { rawMessageInfo, messageData }; } const createEntrySpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async (dmOperation: DMCreateEntryOperation) => { return { messageDatasWithMessageInfos: [ createMessageDataWithInfoFromDMOperation(dmOperation), ], }; }, processDMOperation: async (dmOperation: DMCreateEntryOperation) => { const { threadID, creatorID, time, entryID, entryDate, text } = dmOperation; const { rawMessageInfo } = createMessageDataWithInfoFromDMOperation(dmOperation); const rawMessageInfos = [rawMessageInfo]; const date = dateFromString(entryDate); const rawEntryInfo: ThickRawEntryInfo = { id: entryID, threadID, text, year: date.getFullYear(), month: date.getMonth() + 1, day: date.getDate(), creationTime: time, creatorID, thick: true, deleted: false, lastUpdatedTime: time, }; const entryUpdateInfo: EntryUpdateInfo = { entryInfo: rawEntryInfo, type: updateTypes.UPDATE_ENTRY, id: uuid.v4(), time, }; return { rawMessageInfos, updateInfos: [entryUpdateInfo], blobOps: [], }; }, canBeProcessed: async ( dmOperation: DMCreateEntryOperation, utilities: ProcessDMOperationUtilities, ) => { + if (utilities.entryInfos[dmOperation.entryID]) { + console.log( + 'Discarded a CREATE_ENTRY operation because entry with ' + + `the same ID ${dmOperation.entryID} already exists in the store`, + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } if (utilities.threadInfos[dmOperation.threadID]) { return { isProcessingPossible: true }; } return { isProcessingPossible: false, reason: { type: 'missing_thread', threadID: dmOperation.threadID, }, }; }, supportsAutoRetry: true, }); export { createEntrySpec }; diff --git a/lib/shared/dm-ops/create-sidebar-spec.js b/lib/shared/dm-ops/create-sidebar-spec.js index ba7581124..eff6e8014 100644 --- a/lib/shared/dm-ops/create-sidebar-spec.js +++ b/lib/shared/dm-ops/create-sidebar-spec.js @@ -1,208 +1,256 @@ // @flow import uuid from 'uuid'; import { createThickRawThreadInfo } from './create-thread-spec.js'; import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import type { DMCreateSidebarOperation } from '../../types/dm-ops.js'; import { messageTypes } from '../../types/message-types-enum.js'; import { type RawMessageInfo, messageTruncationStatus, } from '../../types/message-types.js'; import { joinThreadSubscription } from '../../types/subscription-types.js'; import { threadTypes } from '../../types/thread-types-enum.js'; import { updateTypes } from '../../types/update-types-enum.js'; import { generatePendingThreadColor } from '../color-utils.js'; import { isInvalidSidebarSource, rawMessageInfoFromMessageData, } from '../message-utils.js'; import { createThreadTimestamps } from '../thread-utils.js'; async function createMessageDatasWithInfosFromDMOperation( dmOperation: DMCreateSidebarOperation, utilities: ProcessDMOperationUtilities, threadColor?: string, ) { const { threadID, creatorID, time, parentThreadID, memberIDs, sourceMessageID, newSidebarSourceMessageID, newCreateSidebarMessageID, } = dmOperation; const allMemberIDs = [creatorID, ...memberIDs]; const color = threadColor ?? generatePendingThreadColor(allMemberIDs); const sourceMessage = await utilities.fetchMessage(sourceMessageID); if (!sourceMessage) { throw new Error( `could not find sourceMessage ${sourceMessageID}... probably ` + 'joined thick thread ${parentThreadID} after its creation', ); } if (isInvalidSidebarSource(sourceMessage)) { throw new Error( `sourceMessage ${sourceMessageID} is an invalid sidebar source`, ); } const sidebarSourceMessageData = { type: messageTypes.SIDEBAR_SOURCE, threadID, creatorID, time, sourceMessage: sourceMessage, }; const createSidebarMessageData = { type: messageTypes.CREATE_SIDEBAR, threadID, creatorID, time: time + 1, sourceMessageAuthorID: sourceMessage.creatorID, initialThreadState: { parentThreadID, color, memberIDs: allMemberIDs, }, }; const sidebarSourceMessageInfo = rawMessageInfoFromMessageData( sidebarSourceMessageData, newSidebarSourceMessageID, ); const createSidebarMessageInfo = rawMessageInfoFromMessageData( createSidebarMessageData, newCreateSidebarMessageID, ); return { sidebarSourceMessageData, createSidebarMessageData, sidebarSourceMessageInfo, createSidebarMessageInfo, }; } const createSidebarSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async ( dmOperation: DMCreateSidebarOperation, utilities: ProcessDMOperationUtilities, ) => { const { sidebarSourceMessageData, createSidebarMessageData, createSidebarMessageInfo, sidebarSourceMessageInfo, } = await createMessageDatasWithInfosFromDMOperation( dmOperation, utilities, ); return { messageDatasWithMessageInfos: [ { messageData: sidebarSourceMessageData, rawMessageInfo: sidebarSourceMessageInfo, }, { messageData: createSidebarMessageData, rawMessageInfo: createSidebarMessageInfo, }, ], }; }, processDMOperation: async ( dmOperation: DMCreateSidebarOperation, utilities: ProcessDMOperationUtilities, ) => { const { threadID, creatorID, time, parentThreadID, memberIDs, sourceMessageID, roleID, } = dmOperation; const { viewerID } = utilities; const allMemberIDs = [creatorID, ...memberIDs]; const allMemberIDsWithSubscriptions = allMemberIDs.map(id => ({ id, subscription: joinThreadSubscription, })); const rawThreadInfo = createThickRawThreadInfo( { threadID, threadType: threadTypes.THICK_SIDEBAR, creationTime: time, parentThreadID, allMemberIDsWithSubscriptions, roleID, unread: creatorID !== viewerID, sourceMessageID, containingThreadID: parentThreadID, timestamps: createThreadTimestamps(time, allMemberIDs), }, utilities, ); const { sidebarSourceMessageInfo, createSidebarMessageInfo } = await createMessageDatasWithInfosFromDMOperation( dmOperation, utilities, rawThreadInfo.color, ); const rawMessageInfos: Array = [ sidebarSourceMessageInfo, createSidebarMessageInfo, ]; const threadJoinUpdateInfo = { type: updateTypes.JOIN_THREAD, id: uuid.v4(), time, threadInfo: rawThreadInfo, rawMessageInfos, truncationStatus: messageTruncationStatus.EXHAUSTIVE, rawEntryInfos: [], }; return { rawMessageInfos: [], // included in updateInfos below updateInfos: [threadJoinUpdateInfo], blobOps: [], }; }, canBeProcessed: async ( dmOperation: DMCreateSidebarOperation, utilities: ProcessDMOperationUtilities, ) => { - const sourceMessage = await utilities.fetchMessage( - dmOperation.sourceMessageID, - ); + if (utilities.threadInfos[dmOperation.threadID]) { + console.log( + 'Discarded a CREATE_SIDEBAR operation because thread ' + + `with the same ID ${dmOperation.threadID} already exists ` + + 'in the store', + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } + + const [createSidebarMessage, sidebarSourceMessage, sourceMessage] = + await Promise.all( + [ + dmOperation.newCreateSidebarMessageID, + dmOperation.newSidebarSourceMessageID, + dmOperation.sourceMessageID, + ].map(utilities.fetchMessage), + ); + + if (createSidebarMessage) { + console.log( + `Discarded a ${dmOperation.type} operation because ` + + `message with the same ID ${dmOperation.newCreateSidebarMessageID} ` + + 'already exists in the store', + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } + + if (sidebarSourceMessage) { + console.log( + `Discarded a ${dmOperation.type} operation because ` + + `message with the same ID ${dmOperation.newSidebarSourceMessageID} ` + + 'already exists in the store', + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } + if (!sourceMessage) { return { isProcessingPossible: false, reason: { type: 'missing_message', messageID: dmOperation.sourceMessageID, }, }; } return { isProcessingPossible: true }; }, supportsAutoRetry: true, }); export { createSidebarSpec }; diff --git a/lib/shared/dm-ops/create-thread-spec.js b/lib/shared/dm-ops/create-thread-spec.js index 25eb92e42..abe4f0f01 100644 --- a/lib/shared/dm-ops/create-thread-spec.js +++ b/lib/shared/dm-ops/create-thread-spec.js @@ -1,303 +1,319 @@ // @flow import invariant from 'invariant'; import uuid from 'uuid'; import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import { specialRoles } from '../../permissions/special-roles.js'; import { getAllThreadPermissions, makePermissionsBlob, getThickThreadRolePermissionsBlob, makePermissionsForChildrenBlob, } from '../../permissions/thread-permissions.js'; import type { CreateThickRawThreadInfoInput, DMCreateThreadOperation, } from '../../types/dm-ops.js'; import { messageTypes } from '../../types/message-types-enum.js'; import { messageTruncationStatus } from '../../types/message-types.js'; import { type ThickRawThreadInfo, type RoleInfo, minimallyEncodeMemberInfo, minimallyEncodeRoleInfo, minimallyEncodeThreadCurrentUserInfo, } from '../../types/minimally-encoded-thread-permissions-types.js'; import { joinThreadSubscription } from '../../types/subscription-types.js'; import type { ThreadPermissionsInfo } from '../../types/thread-permission-types.js'; import type { ThickThreadType } from '../../types/thread-types-enum.js'; import type { ThickMemberInfo } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import { generatePendingThreadColor } from '../color-utils.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; import { createThreadTimestamps } from '../thread-utils.js'; function createPermissionsInfo( threadID: string, threadType: ThickThreadType, isMember: boolean, parentThreadInfo: ?ThickRawThreadInfo, ): ThreadPermissionsInfo { let rolePermissions = null; if (isMember) { rolePermissions = getThickThreadRolePermissionsBlob(threadType); } let permissionsFromParent = null; if (parentThreadInfo) { const parentThreadRolePermissions = getThickThreadRolePermissionsBlob( parentThreadInfo.type, ); const parentPermissionsBlob = makePermissionsBlob( parentThreadRolePermissions, null, parentThreadInfo.id, parentThreadInfo.type, ); permissionsFromParent = makePermissionsForChildrenBlob( parentPermissionsBlob, ); } return getAllThreadPermissions( makePermissionsBlob( rolePermissions, permissionsFromParent, threadID, threadType, ), threadID, ); } function createRoleAndPermissionForThickThreads( threadType: ThickThreadType, threadID: string, roleID: string, parentThreadInfo: ?ThickRawThreadInfo, ): { +role: RoleInfo, +membershipPermissions: ThreadPermissionsInfo } { const rolePermissions = getThickThreadRolePermissionsBlob(threadType); const membershipPermissions = createPermissionsInfo( threadID, threadType, true, parentThreadInfo, ); const role: RoleInfo = { ...minimallyEncodeRoleInfo({ id: roleID, name: 'Members', permissions: rolePermissions, isDefault: true, }), specialRole: specialRoles.DEFAULT_ROLE, }; return { membershipPermissions, role, }; } type MutableThickRawThreadInfo = { ...ThickRawThreadInfo }; function createThickRawThreadInfo( input: CreateThickRawThreadInfoInput, utilities: ProcessDMOperationUtilities, ): MutableThickRawThreadInfo { const { threadID, threadType, creationTime, parentThreadID, allMemberIDsWithSubscriptions, roleID, unread, name, avatar, description, color, containingThreadID, sourceMessageID, repliesCount, pinnedCount, timestamps, } = input; const memberIDs = allMemberIDsWithSubscriptions.map(({ id }) => id); const threadColor = color ?? generatePendingThreadColor(memberIDs); const parentThreadInfo = parentThreadID ? utilities.threadInfos[parentThreadID] : null; if (parentThreadID && !parentThreadInfo) { console.log( `Parent thread with ID ${parentThreadID} was expected while creating ` + 'thick thread but is missing from the store', ); } invariant( !parentThreadInfo || parentThreadInfo.thick, 'Parent thread should be thick', ); const { membershipPermissions, role } = createRoleAndPermissionForThickThreads( threadType, threadID, roleID, parentThreadInfo, ); const viewerIsMember = allMemberIDsWithSubscriptions.some( member => member.id === utilities.viewerID, ); const viewerRoleID = viewerIsMember ? role.id : null; const viewerMembershipPermissions = createPermissionsInfo( threadID, threadType, viewerIsMember, parentThreadInfo, ); const newThread: MutableThickRawThreadInfo = { thick: true, minimallyEncoded: true, id: threadID, type: threadType, color: threadColor, creationTime, parentThreadID, members: allMemberIDsWithSubscriptions.map( ({ id: memberID, subscription }) => minimallyEncodeMemberInfo({ id: memberID, role: memberID === utilities.viewerID ? viewerRoleID : role.id, permissions: memberID === utilities.viewerID ? viewerMembershipPermissions : membershipPermissions, isSender: memberID === utilities.viewerID, subscription, }), ), roles: { [role.id]: role, }, currentUser: minimallyEncodeThreadCurrentUserInfo({ role: viewerRoleID, permissions: viewerMembershipPermissions, subscription: joinThreadSubscription, unread, }), repliesCount: repliesCount ?? 0, name, avatar, description: description ?? '', containingThreadID, timestamps, }; if (sourceMessageID) { newThread.sourceMessageID = sourceMessageID; } if (pinnedCount) { newThread.pinnedCount = pinnedCount; } return newThread; } function createMessageDataWithInfoFromDMOperation( dmOperation: DMCreateThreadOperation, ) { const { threadID, creatorID, time, threadType, memberIDs, newMessageID } = dmOperation; const allMemberIDs = [creatorID, ...memberIDs]; const color = generatePendingThreadColor(allMemberIDs); const messageData = { type: messageTypes.CREATE_THREAD, threadID, creatorID, time, initialThreadState: { type: threadType, color, memberIDs: allMemberIDs, }, }; const rawMessageInfo = rawMessageInfoFromMessageData( messageData, newMessageID, ); return { messageData, rawMessageInfo }; } const createThreadSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async (dmOperation: DMCreateThreadOperation) => { return { messageDatasWithMessageInfos: [ createMessageDataWithInfoFromDMOperation(dmOperation), ], }; }, processDMOperation: async ( dmOperation: DMCreateThreadOperation, utilities: ProcessDMOperationUtilities, ) => { const { threadID, creatorID, time, threadType, memberIDs, roleID } = dmOperation; const { viewerID } = utilities; const allMemberIDs = [creatorID, ...memberIDs]; const allMemberIDsWithSubscriptions = allMemberIDs.map(id => ({ id, subscription: joinThreadSubscription, })); const rawThreadInfo = createThickRawThreadInfo( { threadID, threadType, creationTime: time, allMemberIDsWithSubscriptions, roleID, unread: creatorID !== viewerID, timestamps: createThreadTimestamps(time, allMemberIDs), }, utilities, ); const { rawMessageInfo } = createMessageDataWithInfoFromDMOperation(dmOperation); const rawMessageInfos = [rawMessageInfo]; const threadJoinUpdateInfo = { type: updateTypes.JOIN_THREAD, id: uuid.v4(), time, threadInfo: rawThreadInfo, rawMessageInfos, truncationStatus: messageTruncationStatus.EXHAUSTIVE, rawEntryInfos: [], }; return { rawMessageInfos: [], // included in updateInfos below updateInfos: [threadJoinUpdateInfo], blobOps: [], }; }, - canBeProcessed: async () => { + canBeProcessed: async ( + dmOperation: DMCreateThreadOperation, + utilities: ProcessDMOperationUtilities, + ) => { + if (utilities.threadInfos[dmOperation.threadID]) { + console.log( + 'Discarded a CREATE_THREAD operation because thread ' + + `with the same ID ${dmOperation.threadID} already exists ` + + 'in the store', + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } return { isProcessingPossible: true }; }, supportsAutoRetry: true, }); export { createThickRawThreadInfo, createThreadSpec, createRoleAndPermissionForThickThreads, createPermissionsInfo, }; diff --git a/lib/shared/dm-ops/dm-op-spec.js b/lib/shared/dm-ops/dm-op-spec.js index 08622f4d0..318df27d4 100644 --- a/lib/shared/dm-ops/dm-op-spec.js +++ b/lib/shared/dm-ops/dm-op-spec.js @@ -1,47 +1,47 @@ // @flow import type { DMOperation, DMOperationResult } from '../../types/dm-ops.js'; import type { RawEntryInfos } from '../../types/entry-types.js'; import type { UserIdentitiesResponse } from '../../types/identity-service-types.js'; import type { RawMessageInfo } from '../../types/message-types.js'; import type { NotificationsCreationData } from '../../types/notif-types.js'; import type { RawThreadInfos } from '../../types/thread-types.js'; export type ProcessDMOperationUtilities = { +viewerID: string, // Needed to fetch sidebar source messages +fetchMessage: (messageID: string) => Promise, +threadInfos: RawThreadInfos, +entryInfos: RawEntryInfos, +findUserIdentities: ( userIDs: $ReadOnlyArray, ) => Promise, }; -type ProcessingPossibilityCheckResult = +export type ProcessingPossibilityCheckResult = | { +isProcessingPossible: true } | { +isProcessingPossible: false, +reason: | { +type: 'missing_thread', +threadID: string } | { +type: 'missing_entry', +entryID: string } | { +type: 'missing_message', +messageID: string } | { +type: 'missing_membership', +threadID: string, +userID: string } | { +type: 'invalid' }, }; export type DMOperationSpec = { +notificationsCreationData?: ( dmOp: DMOp, utilities: ProcessDMOperationUtilities, ) => Promise, +processDMOperation: ( dmOp: DMOp, utilities: ProcessDMOperationUtilities, ) => Promise, +canBeProcessed: ( dmOp: DMOp, utilities: ProcessDMOperationUtilities, ) => Promise, +supportsAutoRetry: boolean, }; diff --git a/lib/shared/dm-ops/dm-op-utils.js b/lib/shared/dm-ops/dm-op-utils.js index 63890314c..d79752e1c 100644 --- a/lib/shared/dm-ops/dm-op-utils.js +++ b/lib/shared/dm-ops/dm-op-utils.js @@ -1,367 +1,400 @@ // @flow import invariant from 'invariant'; import _groupBy from 'lodash/fp/groupBy.js'; import * as React from 'react'; import uuid from 'uuid'; -import { type ProcessDMOperationUtilities } from './dm-op-spec.js'; +import { + type ProcessDMOperationUtilities, + type ProcessingPossibilityCheckResult, +} from './dm-op-spec.js'; import { dmOpSpecs } from './dm-op-specs.js'; import { useProcessAndSendDMOperation } from './process-dm-ops.js'; import { mergeUpdatesWithMessageInfos } from '../../reducers/message-reducer.js'; import type { CreateThickRawThreadInfoInput, DMAddMembersOperation, DMAddViewerToThreadMembersOperation, DMOperation, ComposableDMOperation, } from '../../types/dm-ops.js'; import type { RawMessageInfo } from '../../types/message-types.js'; import type { RawThreadInfo, ThickRawThreadInfo, ThreadInfo, } from '../../types/minimally-encoded-thread-permissions-types.js'; import type { InboundActionMetadata } from '../../types/redux-types.js'; import { outboundP2PMessageStatuses, type OutboundP2PMessage, } from '../../types/sqlite-types.js'; import { assertThickThreadType, thickThreadTypes, } from '../../types/thread-types-enum.js'; import type { LegacyRawThreadInfo } from '../../types/thread-types.js'; import { type DMOperationP2PMessage, userActionsP2PMessageTypes, } from '../../types/tunnelbroker/user-actions-peer-to-peer-message-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import type { ClientUpdateInfo } from '../../types/update-types.js'; import { getContentSigningKey } from '../../utils/crypto-utils.js'; import { useSelector } from '../../utils/redux-utils.js'; import { messageSpecs } from '../messages/message-specs.js'; import { updateSpecs } from '../updates/update-specs.js'; function generateMessagesToPeers( message: DMOperation, peers: $ReadOnlyArray<{ +userID: string, +deviceID: string, }>, ): $ReadOnlyArray { const opMessage: DMOperationP2PMessage = { type: userActionsP2PMessageTypes.DM_OPERATION, op: message, }; const plaintext = JSON.stringify(opMessage); const outboundP2PMessages = []; for (const peer of peers) { const messageToPeer: OutboundP2PMessage = { messageID: uuid.v4(), deviceID: peer.deviceID, userID: peer.userID, timestamp: new Date().getTime().toString(), plaintext, ciphertext: '', status: outboundP2PMessageStatuses.persisted, supportsAutoRetry: dmOpSpecs[message.type].supportsAutoRetry, }; outboundP2PMessages.push(messageToPeer); } return outboundP2PMessages; } export const dmOperationSpecificationTypes = Object.freeze({ OUTBOUND: 'OutboundDMOperationSpecification', INBOUND: 'InboundDMOperationSpecification', }); type OutboundDMOperationSpecificationRecipients = | { +type: 'all_peer_devices' | 'self_devices' } | { +type: 'some_users', +userIDs: $ReadOnlyArray } | { +type: 'all_thread_members', +threadID: string } | { +type: 'some_devices', +deviceIDs: $ReadOnlyArray }; // The operation generated on the sending client, causes changes to // the state and broadcasting information to peers. export type OutboundDMOperationSpecification = { +type: 'OutboundDMOperationSpecification', +op: DMOperation, +recipients: OutboundDMOperationSpecificationRecipients, +sendOnly?: boolean, }; export type OutboundComposableDMOperationSpecification = { +type: 'OutboundDMOperationSpecification', +op: ComposableDMOperation, +recipients: OutboundDMOperationSpecificationRecipients, // Composable DM Ops are created only to be sent, locally we use // dedicated mechanism for updating the store. +sendOnly: true, +composableMessageID: string, }; // The operation received from other peers, causes changes to // the state and after processing, sends confirmation to the sender. export type InboundDMOperationSpecification = { +type: 'InboundDMOperationSpecification', +op: DMOperation, +metadata: ?InboundActionMetadata, }; export type DMOperationSpecification = | OutboundDMOperationSpecification | InboundDMOperationSpecification; async function createMessagesToPeersFromDMOp( operation: DMOperation, recipients: OutboundDMOperationSpecificationRecipients, allPeerUserIDAndDeviceIDs: $ReadOnlyArray<{ +userID: string, +deviceID: string, }>, utilities: ProcessDMOperationUtilities, ): Promise<$ReadOnlyArray> { const { viewerID, threadInfos } = utilities; if (!viewerID) { return []; } let peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs; if (recipients.type === 'self_devices') { peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter( peer => peer.userID === viewerID, ); } else if (recipients.type === 'some_users') { const userIDs = new Set(recipients.userIDs); peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter(peer => userIDs.has(peer.userID), ); } else if (recipients.type === 'all_thread_members') { const { threadID } = recipients; if (!threadInfos[threadID]) { console.log( `all_thread_members called for threadID ${threadID}, which is ` + 'missing from the ThreadStore. if sending a message soon after ' + 'thread creation, consider some_users instead', ); } const members = threadInfos[recipients.threadID]?.members ?? []; const memberIDs = members.map(member => member.id); const userIDs = new Set(memberIDs); peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter(peer => userIDs.has(peer.userID), ); } else if (recipients.type === 'some_devices') { const deviceIDs = new Set(recipients.deviceIDs); peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter(peer => deviceIDs.has(peer.deviceID), ); } const thisDeviceID = await getContentSigningKey(); const targetPeers = peerUserIDAndDeviceIDs.filter( peer => peer.deviceID !== thisDeviceID, ); return generateMessagesToPeers(operation, targetPeers); } function getCreateThickRawThreadInfoInputFromThreadInfo( threadInfo: ThickRawThreadInfo, ): CreateThickRawThreadInfoInput { const roleID = Object.keys(threadInfo.roles).pop(); const thickThreadType = assertThickThreadType(threadInfo.type); return { threadID: threadInfo.id, threadType: thickThreadType, creationTime: threadInfo.creationTime, parentThreadID: threadInfo.parentThreadID, allMemberIDsWithSubscriptions: threadInfo.members.map( ({ id, subscription }) => ({ id, subscription, }), ), roleID, unread: !!threadInfo.currentUser.unread, name: threadInfo.name, avatar: threadInfo.avatar, description: threadInfo.description, color: threadInfo.color, containingThreadID: threadInfo.containingThreadID, sourceMessageID: threadInfo.sourceMessageID, repliesCount: threadInfo.repliesCount, pinnedCount: threadInfo.pinnedCount, timestamps: threadInfo.timestamps, }; } function useAddDMThreadMembers(): ( newMemberIDs: $ReadOnlyArray, threadInfo: ThreadInfo, ) => Promise { const viewerID = useSelector( state => state.currentUserInfo && state.currentUserInfo.id, ); const processAndSendDMOperation = useProcessAndSendDMOperation(); const threadInfos = useSelector(state => state.threadStore.threadInfos); return React.useCallback( async (newMemberIDs: $ReadOnlyArray, threadInfo: ThreadInfo) => { const rawThreadInfo = threadInfos[threadInfo.id]; invariant(rawThreadInfo.thick, 'thread should be thick'); const existingThreadDetails = getCreateThickRawThreadInfoInputFromThreadInfo(rawThreadInfo); invariant(viewerID, 'viewerID should be set'); const addViewerToThreadMembersOperation: DMAddViewerToThreadMembersOperation = { type: 'add_viewer_to_thread_members', existingThreadDetails, editorID: viewerID, time: Date.now(), messageID: uuid.v4(), addedUserIDs: newMemberIDs, }; const viewerOperationSpecification: OutboundDMOperationSpecification = { type: dmOperationSpecificationTypes.OUTBOUND, op: addViewerToThreadMembersOperation, recipients: { type: 'some_users', userIDs: newMemberIDs, }, sendOnly: true, }; invariant(viewerID, 'viewerID should be set'); const addMembersOperation: DMAddMembersOperation = { type: 'add_members', threadID: threadInfo.id, editorID: viewerID, time: Date.now(), messageID: uuid.v4(), addedUserIDs: newMemberIDs, }; const newMemberIDsSet = new Set(newMemberIDs); const recipientsThreadID = threadInfo.type === thickThreadTypes.THICK_SIDEBAR && threadInfo.parentThreadID ? threadInfo.parentThreadID : threadInfo.id; const existingMembers = threadInfos[recipientsThreadID]?.members ?.map(member => member.id) ?.filter(memberID => !newMemberIDsSet.has(memberID)) ?? []; const addMembersOperationSpecification: OutboundDMOperationSpecification = { type: dmOperationSpecificationTypes.OUTBOUND, op: addMembersOperation, recipients: { type: 'some_users', userIDs: existingMembers, }, }; await Promise.all([ processAndSendDMOperation(viewerOperationSpecification), processAndSendDMOperation(addMembersOperationSpecification), ]); }, [processAndSendDMOperation, threadInfos, viewerID], ); } function getThreadUpdatesForNewMessages( rawMessageInfos: $ReadOnlyArray, updateInfos: $ReadOnlyArray, utilities: ProcessDMOperationUtilities, ): Array { const { threadInfos, viewerID } = utilities; const { rawMessageInfos: allNewMessageInfos } = mergeUpdatesWithMessageInfos( rawMessageInfos, updateInfos, ); const messagesByThreadID = _groupBy(message => message.threadID)( allNewMessageInfos, ); const updatedThreadInfosByThreadID: { [string]: RawThreadInfo | LegacyRawThreadInfo, } = {}; for (const threadID in messagesByThreadID) { updatedThreadInfosByThreadID[threadID] = threadInfos[threadID]; } for (const update of updateInfos) { const updatedThreadInfo = updateSpecs[update.type].getUpdatedThreadInfo?.( update, updatedThreadInfosByThreadID, ); if (updatedThreadInfo) { updatedThreadInfosByThreadID[updatedThreadInfo.id] = updatedThreadInfo; } } const newUpdateInfos: Array = []; for (const threadID in messagesByThreadID) { const repliesCountIncreasingMessages = messagesByThreadID[threadID].filter( message => messageSpecs[message.type].includedInRepliesCount, ); let threadInfo = updatedThreadInfosByThreadID[threadID]; if (repliesCountIncreasingMessages.length > 0) { const repliesCountIncreaseTime = Math.max( repliesCountIncreasingMessages.map(message => message.time), ); const newThreadInfo = { ...threadInfo, repliesCount: threadInfo.repliesCount + repliesCountIncreasingMessages.length, }; newUpdateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time: repliesCountIncreaseTime, threadInfo: newThreadInfo, }); threadInfo = newThreadInfo; } const messagesFromOtherPeers = messagesByThreadID[threadID].filter( message => message.creatorID !== viewerID, ); if (messagesFromOtherPeers.length === 0) { continue; } // We take the most recent timestamp to make sure that // change_thread_read_status operation older // than it won't flip the status to read. const time = Math.max(messagesFromOtherPeers.map(message => message.time)); invariant(threadInfo.thick, 'Thread should be thick'); // We aren't checking if the unread timestamp is lower than the time. // We're doing this because we want to flip the thread to unread after // any new message from a non-viewer. newUpdateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: uuid.v4(), time, threadID: threadInfo.id, unread: true, }); } return newUpdateInfos; } +async function checkMessageIDConflict( + dmOperation: DMOperation, + utilities: ProcessDMOperationUtilities, +): Promise { + if (!dmOperation.messageID) { + return { + isProcessingPossible: true, + }; + } + const messageID = dmOperation.messageID; + const message = await utilities.fetchMessage(messageID); + if (message) { + console.log( + `Discarded a ${dmOperation.type} operation because ` + + `message with the same ID ${messageID} already exists ` + + 'in the store', + ); + return { + isProcessingPossible: false, + reason: { + type: 'invalid', + }, + }; + } + return { + isProcessingPossible: true, + }; +} + export { createMessagesToPeersFromDMOp, useAddDMThreadMembers, getCreateThickRawThreadInfoInputFromThreadInfo, getThreadUpdatesForNewMessages, + checkMessageIDConflict, }; diff --git a/lib/shared/dm-ops/process-dm-ops.js b/lib/shared/dm-ops/process-dm-ops.js index e30994354..d3d6b4144 100644 --- a/lib/shared/dm-ops/process-dm-ops.js +++ b/lib/shared/dm-ops/process-dm-ops.js @@ -1,412 +1,419 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import type { ProcessDMOperationUtilities } from './dm-op-spec.js'; import { dmOpSpecs } from './dm-op-specs.js'; import { type OutboundDMOperationSpecification, type DMOperationSpecification, createMessagesToPeersFromDMOp, dmOperationSpecificationTypes, type OutboundComposableDMOperationSpecification, getThreadUpdatesForNewMessages, + checkMessageIDConflict, } from './dm-op-utils.js'; import { useProcessBlobHolders } from '../../actions/holder-actions.js'; import { processNewUserIDsActionType, useFindUserIdentities, } from '../../actions/user-actions.js'; import { useLoggedInUserInfo } from '../../hooks/account-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useDispatchWithMetadata } from '../../hooks/ops-hooks.js'; import { getAllPeerUserIDAndDeviceIDs } from '../../selectors/user-selectors.js'; import { usePeerToPeerCommunication, type ProcessOutboundP2PMessagesResult, } from '../../tunnelbroker/peer-to-peer-context.js'; import { processDMOpsActionType, queueDMOpsActionType, dmOperationValidator, } from '../../types/dm-ops.js'; import type { NotificationsCreationData } from '../../types/notif-types.js'; import type { DispatchMetadata } from '../../types/redux-types.js'; import type { OutboundP2PMessage } from '../../types/sqlite-types.js'; import { extractUserIDsFromPayload } from '../../utils/conversion-utils.js'; import { useSelector, useDispatch } from '../../utils/redux-utils.js'; function useSendDMOperationUtils() { const fetchMessage = useGetLatestMessageEdit(); const threadInfos = useSelector(state => state.threadStore.threadInfos); const entryInfos = useSelector(state => state.entryStore.entryInfos); const findUserIdentities = useFindUserIdentities(); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; return React.useMemo( () => ({ viewerID, fetchMessage, threadInfos, entryInfos, findUserIdentities, }), [viewerID, fetchMessage, threadInfos, entryInfos, findUserIdentities], ); } function useProcessDMOperation(): ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => Promise { const baseUtilities = useSendDMOperationUtils(); const dispatchWithMetadata = useDispatchWithMetadata(); const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const processBlobHolders = useProcessBlobHolders(); const dispatch = useDispatch(); return React.useCallback( async ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => { const { viewerID, ...restUtilities } = baseUtilities; if (!viewerID) { console.log('ignored DMOperation because logged out'); return; } const utilities: ProcessDMOperationUtilities = { ...restUtilities, viewerID, }; const { op: dmOp } = dmOperationSpecification; let outboundP2PMessages: ?$ReadOnlyArray = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND ) { outboundP2PMessages = await createMessagesToPeersFromDMOp( dmOp, dmOperationSpecification.recipients, allPeerUserIDAndDeviceIDs, utilities, ); } let dispatchMetadata: ?DispatchMetadata = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOpID ) { dispatchMetadata = { dmOpID, }; } else if ( dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND ) { dispatchMetadata = dmOperationSpecification.metadata; } let composableMessageID: ?string = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && !dmOpSpecs[dmOp.type].supportsAutoRetry ) { composableMessageID = dmOp.messageID; } if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOperationSpecification.sendOnly ) { const notificationsCreationData = await dmOpSpecs[ dmOp.type ].notificationsCreationData?.(dmOp, utilities); dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos: [], updateInfos: [], outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, dispatchMetadata, ); return; } - const processingCheckResult = await dmOpSpecs[dmOp.type].canBeProcessed( - dmOp, - utilities, - ); + const [messageIDConflictCheckResult, processingCheckResult] = + await Promise.all([ + checkMessageIDConflict(dmOp, utilities), + dmOpSpecs[dmOp.type].canBeProcessed(dmOp, utilities), + ]); + + if (!messageIDConflictCheckResult.isProcessingPossible) { + return; + } + if (!processingCheckResult.isProcessingPossible) { if (processingCheckResult.reason.type === 'invalid') { return; } let condition; if (processingCheckResult.reason.type === 'missing_thread') { condition = { type: 'thread', threadID: processingCheckResult.reason.threadID, }; } else if (processingCheckResult.reason.type === 'missing_entry') { condition = { type: 'entry', entryID: processingCheckResult.reason.entryID, }; } else if (processingCheckResult.reason.type === 'missing_message') { condition = { type: 'message', messageID: processingCheckResult.reason.messageID, }; } else if (processingCheckResult.reason.type === 'missing_membership') { condition = { type: 'membership', threadID: processingCheckResult.reason.threadID, userID: processingCheckResult.reason.userID, }; } dispatchWithMetadata( { type: queueDMOpsActionType, payload: { operation: dmOp, timestamp: Date.now(), condition, }, }, dispatchMetadata, ); return; } const newUserIDs = extractUserIDsFromPayload(dmOperationValidator, dmOp); if (newUserIDs.length > 0) { dispatch({ type: processNewUserIDsActionType, payload: { userIDs: newUserIDs }, }); } const dmOpSpec = dmOpSpecs[dmOp.type]; const notificationsCreationDataPromise: Promise = (async () => { if ( dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND || !dmOpSpec.notificationsCreationData ) { return null; } return await dmOpSpec.notificationsCreationData(dmOp, utilities); })(); const [ { rawMessageInfos, updateInfos, blobOps }, notificationsCreationData, ] = await Promise.all([ dmOpSpec.processDMOperation(dmOp, utilities), notificationsCreationDataPromise, ]); const holderOps = blobOps .map(({ dmOpType, ...holderOp }) => { if ( (dmOpType === 'inbound_only' && dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND) || (dmOpType === 'outbound_only' && dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND) ) { return null; } return holderOp; }) .filter(Boolean); void processBlobHolders(holderOps); const newUpdateInfos = getThreadUpdatesForNewMessages( rawMessageInfos, updateInfos, utilities, ); updateInfos.push(...newUpdateInfos); dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos, updateInfos, outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, dispatchMetadata, ); }, [ baseUtilities, dispatchWithMetadata, allPeerUserIDAndDeviceIDs, dispatch, processBlobHolders, ], ); } function useProcessAndSendDMOperation(): ( dmOperationSpecification: OutboundDMOperationSpecification, ) => Promise { const processDMOps = useProcessDMOperation(); const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); return React.useCallback( async (dmOperationSpecification: OutboundDMOperationSpecification) => { const { promise, dmOpID } = getDMOpsSendingPromise(); await processDMOps(dmOperationSpecification, dmOpID); await promise; }, [getDMOpsSendingPromise, processDMOps], ); } function useSendComposableDMOperation(): ( dmOperationSpecification: OutboundComposableDMOperationSpecification, ) => Promise { const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); const dispatchWithMetadata = useDispatchWithMetadata(); const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const baseUtilities = useSendDMOperationUtils(); const { processOutboundMessages } = usePeerToPeerCommunication(); const localMessageInfos = useSelector(state => state.messageStore.local); return React.useCallback( async ( dmOperationSpecification: OutboundComposableDMOperationSpecification, ): Promise => { const { viewerID, ...restUtilities } = baseUtilities; if (!viewerID) { console.log('ignored DMOperation because logged out'); return { result: 'failure', failedMessageIDs: [], }; } const utilities: ProcessDMOperationUtilities = { ...restUtilities, viewerID, }; const { promise, dmOpID } = getDMOpsSendingPromise(); const { op, composableMessageID, recipients } = dmOperationSpecification; const localMessageInfo = localMessageInfos[composableMessageID]; if ( localMessageInfo?.outboundP2PMessageIDs && localMessageInfo.outboundP2PMessageIDs.length > 0 ) { processOutboundMessages(localMessageInfo.outboundP2PMessageIDs, dmOpID); try { // This code should never throw. return await promise; } catch (e) { invariant( localMessageInfo.outboundP2PMessageIDs, 'outboundP2PMessageIDs should be defined', ); return { result: 'failure', failedMessageIDs: localMessageInfo.outboundP2PMessageIDs, }; } } const outboundP2PMessages = await createMessagesToPeersFromDMOp( op, recipients, allPeerUserIDAndDeviceIDs, utilities, ); const spec = dmOpSpecs[op.type]; const notificationsCreationDataPromise: Promise = (async () => { if (!spec?.notificationsCreationData) { return null; } return await spec.notificationsCreationData(op, utilities); })(); const [{ rawMessageInfos, updateInfos }, notificationsCreationData] = await Promise.all([ dmOpSpecs[op.type].processDMOperation(op, utilities), notificationsCreationDataPromise, ]); const newUpdateInfos = getThreadUpdatesForNewMessages( rawMessageInfos, updateInfos, utilities, ); dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos: [], updateInfos: newUpdateInfos, outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, { dmOpID, }, ); try { // This code should never throw. return await promise; } catch (e) { return { result: 'failure', failedMessageIDs: outboundP2PMessages.map( message => message.messageID, ), }; } }, [ allPeerUserIDAndDeviceIDs, dispatchWithMetadata, getDMOpsSendingPromise, localMessageInfos, processOutboundMessages, baseUtilities, ], ); } export { useProcessDMOperation, useProcessAndSendDMOperation, useSendComposableDMOperation, }; diff --git a/lib/shared/dm-ops/send-edit-message-spec.js b/lib/shared/dm-ops/send-edit-message-spec.js index 44ef4e861..b2e1e9094 100644 --- a/lib/shared/dm-ops/send-edit-message-spec.js +++ b/lib/shared/dm-ops/send-edit-message-spec.js @@ -1,71 +1,73 @@ // @flow import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import type { DMSendEditMessageOperation } from '../../types/dm-ops.js'; import { messageTypes } from '../../types/message-types-enum.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; function createMessageDataWithInfoFromDMOperation( dmOperation: DMSendEditMessageOperation, ) { const { threadID, creatorID, time, targetMessageID, text, messageID } = dmOperation; const messageData = { type: messageTypes.EDIT_MESSAGE, threadID, creatorID, time, targetMessageID, text, }; const rawMessageInfo = rawMessageInfoFromMessageData(messageData, messageID); return { messageData, rawMessageInfo }; } const sendEditMessageSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async ( dmOperation: DMSendEditMessageOperation, ) => { return { messageDatasWithMessageInfos: [ createMessageDataWithInfoFromDMOperation(dmOperation), ], }; }, processDMOperation: async (dmOperation: DMSendEditMessageOperation) => { const { rawMessageInfo } = createMessageDataWithInfoFromDMOperation(dmOperation); const rawMessageInfos = [rawMessageInfo]; return { rawMessageInfos, updateInfos: [], blobOps: [], }; }, canBeProcessed: async ( dmOperation: DMSendEditMessageOperation, utilities: ProcessDMOperationUtilities, ) => { - const message = await utilities.fetchMessage(dmOperation.targetMessageID); - if (!message) { + const targetMessage = await utilities.fetchMessage( + dmOperation.targetMessageID, + ); + if (!targetMessage) { return { isProcessingPossible: false, reason: { type: 'missing_message', messageID: dmOperation.targetMessageID, }, }; } return { isProcessingPossible: true, }; }, supportsAutoRetry: true, }); export { sendEditMessageSpec }; diff --git a/lib/shared/dm-ops/send-multimedia-message-spec.js b/lib/shared/dm-ops/send-multimedia-message-spec.js index 1347d23c9..2ee1a914d 100644 --- a/lib/shared/dm-ops/send-multimedia-message-spec.js +++ b/lib/shared/dm-ops/send-multimedia-message-spec.js @@ -1,106 +1,106 @@ // @flow import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import { encryptedMediaBlobURI, encryptedVideoThumbnailBlobURI, } from '../../media/media-utils.js'; import type { DMSendMultimediaMessageOperation, DMBlobOperation, } from '../../types/dm-ops.js'; import { messageTypes } from '../../types/message-types-enum.js'; import type { ClientUpdateInfo } from '../../types/update-types.js'; import { blobHashFromBlobServiceURI } from '../../utils/blob-service.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; function createMessageDataWithInfoFromDMOperation( dmOperation: DMSendMultimediaMessageOperation, ) { const { threadID, creatorID, time, media, messageID } = dmOperation; const messageData = { type: messageTypes.MULTIMEDIA, threadID, creatorID, time, media, }; const rawMessageInfo = rawMessageInfoFromMessageData(messageData, messageID); return { messageData, rawMessageInfo }; } function getBlobOpsFromOperation( dmOperation: DMSendMultimediaMessageOperation, ): Array { const ops: Array = []; for (const media of dmOperation.media) { if (media.type !== 'encrypted_photo' && media.type !== 'encrypted_video') { continue; } const blobURI = encryptedMediaBlobURI(media); ops.push({ type: 'establish_holder', blobHash: blobHashFromBlobServiceURI(blobURI), dmOpType: 'inbound_only', }); if (media.type === 'encrypted_video') { const thumbnailBlobURI = encryptedVideoThumbnailBlobURI(media); ops.push({ type: 'establish_holder', blobHash: blobHashFromBlobServiceURI(thumbnailBlobURI), dmOpType: 'inbound_only', }); } } return ops; } const sendMultimediaMessageSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async ( dmOperation: DMSendMultimediaMessageOperation, ) => { return { messageDatasWithMessageInfos: [ createMessageDataWithInfoFromDMOperation(dmOperation), ], }; }, processDMOperation: async ( dmOperation: DMSendMultimediaMessageOperation, ) => { const { rawMessageInfo } = createMessageDataWithInfoFromDMOperation(dmOperation); const rawMessageInfos = [rawMessageInfo]; const updateInfos: Array = []; const blobOps = getBlobOpsFromOperation(dmOperation); return { rawMessageInfos, updateInfos, blobOps, }; }, canBeProcessed: async ( dmOperation: DMSendMultimediaMessageOperation, utilities: ProcessDMOperationUtilities, ) => { - if (utilities.threadInfos[dmOperation.threadID]) { - return { isProcessingPossible: true }; + if (!utilities.threadInfos[dmOperation.threadID]) { + return { + isProcessingPossible: false, + reason: { + type: 'missing_thread', + threadID: dmOperation.threadID, + }, + }; } - return { - isProcessingPossible: false, - reason: { - type: 'missing_thread', - threadID: dmOperation.threadID, - }, - }; + return { isProcessingPossible: true }; }, supportsAutoRetry: false, }); export { sendMultimediaMessageSpec }; diff --git a/lib/shared/dm-ops/send-reaction-message-spec.js b/lib/shared/dm-ops/send-reaction-message-spec.js index 2d3bf051d..62a0b5113 100644 --- a/lib/shared/dm-ops/send-reaction-message-spec.js +++ b/lib/shared/dm-ops/send-reaction-message-spec.js @@ -1,79 +1,81 @@ // @flow import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import type { DMSendReactionMessageOperation } from '../../types/dm-ops.js'; import { messageTypes } from '../../types/message-types-enum.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; function createMessageDataWithInfoFromDMOperation( dmOperation: DMSendReactionMessageOperation, ) { const { threadID, creatorID, time, targetMessageID, reaction, action, messageID, } = dmOperation; const messageData = { type: messageTypes.REACTION, threadID, creatorID, time, targetMessageID, reaction, action, }; const rawMessageInfo = rawMessageInfoFromMessageData(messageData, messageID); return { messageData, rawMessageInfo }; } const sendReactionMessageSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async ( dmOperation: DMSendReactionMessageOperation, ) => { return { messageDatasWithMessageInfos: [ createMessageDataWithInfoFromDMOperation(dmOperation), ], }; }, processDMOperation: async (dmOperation: DMSendReactionMessageOperation) => { const { rawMessageInfo } = createMessageDataWithInfoFromDMOperation(dmOperation); const rawMessageInfos = [rawMessageInfo]; return { rawMessageInfos, updateInfos: [], blobOps: [], }; }, canBeProcessed: async ( dmOperation: DMSendReactionMessageOperation, utilities: ProcessDMOperationUtilities, ) => { - const message = await utilities.fetchMessage(dmOperation.targetMessageID); - if (!message) { + const targetMessage = await utilities.fetchMessage( + dmOperation.targetMessageID, + ); + if (!targetMessage) { return { isProcessingPossible: false, reason: { type: 'missing_message', messageID: dmOperation.targetMessageID, }, }; } return { isProcessingPossible: true, }; }, supportsAutoRetry: true, }); export { sendReactionMessageSpec }; diff --git a/lib/shared/dm-ops/send-text-message-spec.js b/lib/shared/dm-ops/send-text-message-spec.js index d2aa83011..149fcc347 100644 --- a/lib/shared/dm-ops/send-text-message-spec.js +++ b/lib/shared/dm-ops/send-text-message-spec.js @@ -1,67 +1,67 @@ // @flow import type { DMOperationSpec, ProcessDMOperationUtilities, } from './dm-op-spec.js'; import type { DMSendTextMessageOperation } from '../../types/dm-ops.js'; import { messageTypes } from '../../types/message-types-enum.js'; import type { ClientUpdateInfo } from '../../types/update-types.js'; import { rawMessageInfoFromMessageData } from '../message-utils.js'; function createMessageDataWithInfoFromDMOperation( dmOperation: DMSendTextMessageOperation, ) { const { threadID, creatorID, time, text, messageID } = dmOperation; const messageData = { type: messageTypes.TEXT, threadID, creatorID, time, text, }; const rawMessageInfo = rawMessageInfoFromMessageData(messageData, messageID); return { messageData, rawMessageInfo }; } const sendTextMessageSpec: DMOperationSpec = Object.freeze({ notificationsCreationData: async ( dmOperation: DMSendTextMessageOperation, ) => { return { messageDatasWithMessageInfos: [ createMessageDataWithInfoFromDMOperation(dmOperation), ], }; }, processDMOperation: async (dmOperation: DMSendTextMessageOperation) => { const { rawMessageInfo } = createMessageDataWithInfoFromDMOperation(dmOperation); const rawMessageInfos = [rawMessageInfo]; const updateInfos: Array = []; return { rawMessageInfos, updateInfos, blobOps: [], }; }, canBeProcessed: async ( dmOperation: DMSendTextMessageOperation, utilities: ProcessDMOperationUtilities, ) => { - if (utilities.threadInfos[dmOperation.threadID]) { - return { isProcessingPossible: true }; + if (!utilities.threadInfos[dmOperation.threadID]) { + return { + isProcessingPossible: false, + reason: { + type: 'missing_thread', + threadID: dmOperation.threadID, + }, + }; } - return { - isProcessingPossible: false, - reason: { - type: 'missing_thread', - threadID: dmOperation.threadID, - }, - }; + return { isProcessingPossible: true }; }, supportsAutoRetry: false, }); export { sendTextMessageSpec };