diff --git a/lib/hooks/input-state-container-hooks.js b/lib/hooks/input-state-container-hooks.js index 30afb76b1..cb60e2774 100644 --- a/lib/hooks/input-state-container-hooks.js +++ b/lib/hooks/input-state-container-hooks.js @@ -1,219 +1,219 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { useLegacySendMultimediaMessage, useSendMultimediaMessage, useSendTextMessage, } from '../actions/message-actions.js'; import { dmOperationSpecificationTypes } from '../shared/dm-ops/dm-op-utils.js'; import { useSendComposableDMOperation } from '../shared/dm-ops/process-dm-ops.js'; import type { RawMultimediaMessageInfo, SendMessagePayload, } from '../types/message-types.js'; import { getMediaMessageServerDBContentsFromMedia } from '../types/messages/media.js'; import type { RawTextMessageInfo } from '../types/messages/text.js'; import type { ThreadInfo } from '../types/minimally-encoded-thread-permissions-types.js'; import { thickThreadTypes, threadTypeIsThick, } from '../types/thread-types-enum.js'; import { useSelector } from '../utils/redux-utils.js'; function useInputStateContainerSendTextMessage(): ( messageInfo: RawTextMessageInfo, threadInfo: ThreadInfo, parentThreadInfo: ?ThreadInfo, sidebarCreation: boolean, ) => Promise { const sendTextMessage = useSendTextMessage(); const sendComposableDMOperation = useSendComposableDMOperation(); return React.useCallback( async ( messageInfo: RawTextMessageInfo, threadInfo: ThreadInfo, parentThreadInfo: ?ThreadInfo, sidebarCreation: boolean, ) => { const { localID } = messageInfo; invariant( localID !== null && localID !== undefined, 'localID should be set', ); if (!threadTypeIsThick(threadInfo.type)) { const result = await sendTextMessage({ threadID: messageInfo.threadID, localID, text: messageInfo.text, sidebarCreation, }); return { localID, serverID: result.id, threadID: messageInfo.threadID, time: result.time, }; } const messageID = uuid.v4(); const time = Date.now(); const recipients = threadInfo.type === thickThreadTypes.THICK_SIDEBAR && parentThreadInfo ? parentThreadInfo.members : threadInfo.members; const recipientsIDs = recipients.map(recipient => recipient.id); - const messageIDs = await sendComposableDMOperation({ + const result = await sendComposableDMOperation({ type: dmOperationSpecificationTypes.OUTBOUND, op: { type: 'send_text_message', threadID: threadInfo.id, creatorID: messageInfo.creatorID, time, messageID, text: messageInfo.text, }, // We need to use a different mechanism than `all_thread_members` // because when creating a thread, the thread might not yet // be in the store. recipients: { type: 'some_users', userIDs: recipientsIDs, }, sendOnly: true, composableMessageID: localID, }); - if (messageIDs.length > 0) { + if (result.result === 'failure' && result.failedMessageIDs.length > 0) { const e: any = new Error('Failed to send message to all peers'); - e.failedOutboundP2PMessageIDs = messageIDs; + e.failedOutboundP2PMessageIDs = result.failedMessageIDs; throw e; } return { localID, serverID: messageID, threadID: messageInfo.threadID, time, }; }, [sendComposableDMOperation, sendTextMessage], ); } function useInputStateContainerSendMultimediaMessage(): ( messageInfo: RawMultimediaMessageInfo, sidebarCreation: boolean, isLegacy: boolean, ) => Promise { const sendMultimediaMessage = useSendMultimediaMessage(); const legacySendMultimediaMessage = useLegacySendMultimediaMessage(); const sendComposableDMOperation = useSendComposableDMOperation(); const threadInfos = useSelector(state => state.threadStore.threadInfos); return React.useCallback( async ( messageInfo: RawMultimediaMessageInfo, sidebarCreation: boolean, isLegacy: boolean, ) => { const { localID } = messageInfo; invariant( localID !== null && localID !== undefined, 'localID should be set', ); const threadInfo = threadInfos[messageInfo.threadID]; if (!threadTypeIsThick(threadInfo.type) && isLegacy) { const mediaIDs = []; for (const { id } of messageInfo.media) { mediaIDs.push(id); } const result = await legacySendMultimediaMessage({ threadID: messageInfo.threadID, localID, mediaIDs, sidebarCreation, }); return { localID, serverID: result.id, threadID: messageInfo.threadID, time: result.time, }; } if (!threadTypeIsThick(threadInfo.type) && !isLegacy) { const mediaMessageContents = getMediaMessageServerDBContentsFromMedia( messageInfo.media, ); const result = await sendMultimediaMessage({ threadID: messageInfo.threadID, localID, mediaMessageContents, sidebarCreation, }); return { localID, serverID: result.id, threadID: messageInfo.threadID, time: result.time, }; } const messageID = uuid.v4(); const time = Date.now(); - const messageIDs = await sendComposableDMOperation({ + const result = await sendComposableDMOperation({ type: dmOperationSpecificationTypes.OUTBOUND, op: { type: 'send_multimedia_message', threadID: threadInfo.id, creatorID: messageInfo.creatorID, time: Date.now(), messageID, media: messageInfo.media, }, recipients: { type: 'all_thread_members', threadID: threadInfo.type === thickThreadTypes.THICK_SIDEBAR && threadInfo.parentThreadID ? threadInfo.parentThreadID : threadInfo.id, }, sendOnly: true, composableMessageID: localID, }); - if (messageIDs.length > 0) { + if (result.result === 'failure' && result.failedMessageIDs.length > 0) { const e: any = new Error('Failed to send message to all peers'); - e.failedOutboundP2PMessageIDs = messageIDs; + e.failedOutboundP2PMessageIDs = result.failedMessageIDs; throw e; } return { localID, serverID: messageID, threadID: messageInfo.threadID, time, }; }, [ legacySendMultimediaMessage, sendComposableDMOperation, sendMultimediaMessage, threadInfos, ], ); } export { useInputStateContainerSendTextMessage, useInputStateContainerSendMultimediaMessage, }; diff --git a/lib/shared/dm-ops/process-dm-ops.js b/lib/shared/dm-ops/process-dm-ops.js index 2d3611acb..ff5fb945c 100644 --- a/lib/shared/dm-ops/process-dm-ops.js +++ b/lib/shared/dm-ops/process-dm-ops.js @@ -1,386 +1,389 @@ // @flow import invariant from 'invariant'; import _groupBy from 'lodash/fp/groupBy.js'; import * as React from 'react'; import uuid from 'uuid'; import type { ProcessDMOperationUtilities } from './dm-op-spec.js'; import { dmOpSpecs } from './dm-op-specs.js'; import { type OutboundDMOperationSpecification, type DMOperationSpecification, createMessagesToPeersFromDMOp, dmOperationSpecificationTypes, type OutboundComposableDMOperationSpecification, } from './dm-op-utils.js'; import { processNewUserIDsActionType } from '../../actions/user-actions.js'; import { useLoggedInUserInfo } from '../../hooks/account-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useDispatchWithMetadata } from '../../hooks/ops-hooks.js'; import { mergeUpdatesWithMessageInfos } from '../../reducers/message-reducer.js'; import { getAllPeerUserIDAndDeviceIDs } from '../../selectors/user-selectors.js'; -import { usePeerToPeerCommunication } from '../../tunnelbroker/peer-to-peer-context.js'; +import { + usePeerToPeerCommunication, + type ProcessOutboundP2PMessagesResult, +} from '../../tunnelbroker/peer-to-peer-context.js'; import { processDMOpsActionType, queueDMOpsActionType, dmOperationValidator, } from '../../types/dm-ops.js'; import type { RawThreadInfo } from '../../types/minimally-encoded-thread-permissions-types.js'; import type { DispatchMetadata } from '../../types/redux-types.js'; import type { OutboundP2PMessage } from '../../types/sqlite-types.js'; import type { LegacyRawThreadInfo } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import { extractUserIDsFromPayload } from '../../utils/conversion-utils.js'; import { useSelector, useDispatch } from '../../utils/redux-utils.js'; import { messageSpecs } from '../messages/message-specs.js'; import { updateSpecs } from '../updates/update-specs.js'; function useSendDMOperationUtils(): ProcessDMOperationUtilities { const fetchMessage = useGetLatestMessageEdit(); const threadInfos = useSelector(state => state.threadStore.threadInfos); return React.useMemo( () => ({ fetchMessage, threadInfos, }), [fetchMessage, threadInfos], ); } function useProcessDMOperation(): ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => Promise { const threadInfos = useSelector(state => state.threadStore.threadInfos); const utilities = useSendDMOperationUtils(); const dispatchWithMetadata = useDispatchWithMetadata(); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const currentUserInfo = useSelector(state => state.currentUserInfo); const dispatch = useDispatch(); return React.useCallback( async ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => { if (!viewerID) { console.log('ignored DMOperation because logged out'); return; } const { op: dmOp } = dmOperationSpecification; let outboundP2PMessages: ?$ReadOnlyArray = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND ) { outboundP2PMessages = await createMessagesToPeersFromDMOp( dmOp, dmOperationSpecification.recipients, allPeerUserIDAndDeviceIDs, currentUserInfo, threadInfos, ); } let dispatchMetadata: ?DispatchMetadata = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOpID ) { dispatchMetadata = { dmOpID, }; } else if ( dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND ) { dispatchMetadata = dmOperationSpecification.metadata; } let composableMessageID: ?string = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && !dmOpSpecs[dmOp.type].supportsAutoRetry ) { composableMessageID = dmOp.messageID; } if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOperationSpecification.sendOnly ) { const notificationsCreationData = await dmOpSpecs[ dmOp.type ].notificationsCreationData?.(dmOp, utilities); dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos: [], updateInfos: [], outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, dispatchMetadata, ); return; } const processingCheckResult = dmOpSpecs[dmOp.type].canBeProcessed( dmOp, viewerID, utilities, ); if (!processingCheckResult.isProcessingPossible) { if (processingCheckResult.reason.type === 'missing_thread') { dispatchWithMetadata( { type: queueDMOpsActionType, payload: { operation: dmOp, threadID: processingCheckResult.reason.threadID, timestamp: Date.now(), }, }, dispatchMetadata, ); } return; } const newUserIDs = extractUserIDsFromPayload(dmOperationValidator, dmOp); if (newUserIDs.length > 0) { dispatch({ type: processNewUserIDsActionType, payload: { userIDs: newUserIDs }, }); } const dmOpSpec = dmOpSpecs[dmOp.type]; const notificationsCreationDataPromise = (async () => { return await dmOpSpec.notificationsCreationData?.(dmOp, utilities); })(); const [{ rawMessageInfos, updateInfos }, notificationsCreationData] = await Promise.all([ dmOpSpec.processDMOperation(dmOp, viewerID, utilities), notificationsCreationDataPromise, ]); const { rawMessageInfos: allNewMessageInfos } = mergeUpdatesWithMessageInfos(rawMessageInfos, updateInfos); const messagesByThreadID = _groupBy(message => message.threadID)( allNewMessageInfos, ); const updatedThreadInfosByThreadID: { [string]: RawThreadInfo | LegacyRawThreadInfo, } = {}; for (const threadID in messagesByThreadID) { updatedThreadInfosByThreadID[threadID] = threadInfos[threadID]; } for (const update of updateInfos) { const updatedThreadInfo = updateSpecs[ update.type ].getUpdatedThreadInfo?.(update, updatedThreadInfosByThreadID); if (updatedThreadInfo) { updatedThreadInfosByThreadID[updatedThreadInfo.id] = updatedThreadInfo; } } for (const threadID in messagesByThreadID) { const repliesCountIncreasingMessages = messagesByThreadID[ threadID ].filter(message => messageSpecs[message.type].includedInRepliesCount); const threadInfo = updatedThreadInfosByThreadID[threadID]; if (repliesCountIncreasingMessages.length > 0) { const repliesCountIncreaseTime = Math.max( repliesCountIncreasingMessages.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time: repliesCountIncreaseTime, threadInfo: { ...threadInfo, repliesCount: threadInfo.repliesCount + repliesCountIncreasingMessages.length, }, }); } const messagesFromOtherPeers = messagesByThreadID[threadID].filter( message => message.creatorID !== viewerID, ); if (messagesFromOtherPeers.length === 0) { continue; } // We take the most recent timestamp to make sure that // change_thread_read_status operation older // than it won't flip the status to read. const time = Math.max( messagesFromOtherPeers.map(message => message.time), ); invariant(threadInfo.thick, 'Thread should be thick'); // We aren't checking if the unread timestamp is lower than the time. // We're doing this because we want to flip the thread to unread after // any new message from a non-viewer. const updatedThreadInfo = threadInfo.minimallyEncoded ? { ...threadInfo, currentUser: { ...threadInfo.currentUser, unread: true, }, timestamps: { ...threadInfo.timestamps, currentUser: { ...threadInfo.timestamps.currentUser, unread: time, }, }, } : { ...threadInfo, currentUser: { ...threadInfo.currentUser, unread: true, }, timestamps: { ...threadInfo.timestamps, currentUser: { ...threadInfo.timestamps.currentUser, unread: time, }, }, }; updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time, threadInfo: updatedThreadInfo, }); } dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos, updateInfos, outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, dispatchMetadata, ); }, [ viewerID, utilities, dispatchWithMetadata, allPeerUserIDAndDeviceIDs, currentUserInfo, threadInfos, dispatch, ], ); } function useProcessAndSendDMOperation(): ( dmOperationSpecification: OutboundDMOperationSpecification, ) => Promise { const processDMOps = useProcessDMOperation(); const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); return React.useCallback( async (dmOperationSpecification: OutboundDMOperationSpecification) => { const { promise, dmOpID } = getDMOpsSendingPromise(); await processDMOps(dmOperationSpecification, dmOpID); await promise; }, [getDMOpsSendingPromise, processDMOps], ); } function useSendComposableDMOperation(): ( dmOperationSpecification: OutboundComposableDMOperationSpecification, -) => Promise<$ReadOnlyArray> { +) => Promise { const threadInfos = useSelector(state => state.threadStore.threadInfos); const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); const dispatchWithMetadata = useDispatchWithMetadata(); const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const currentUserInfo = useSelector(state => state.currentUserInfo); const utilities = useSendDMOperationUtils(); return React.useCallback( async ( dmOperationSpecification: OutboundComposableDMOperationSpecification, - ): Promise<$ReadOnlyArray> => { + ): Promise => { const { promise, dmOpID } = getDMOpsSendingPromise(); const { op, composableMessageID, recipients } = dmOperationSpecification; const outboundP2PMessages = await createMessagesToPeersFromDMOp( op, recipients, allPeerUserIDAndDeviceIDs, currentUserInfo, threadInfos, ); const notificationsCreationData = await dmOpSpecs[ op.type ].notificationsCreationData?.(op, utilities); dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos: [], updateInfos: [], outboundP2PMessages, composableMessageID, notificationsCreationData, }, }, { dmOpID, }, ); return promise; }, [ allPeerUserIDAndDeviceIDs, currentUserInfo, dispatchWithMetadata, getDMOpsSendingPromise, threadInfos, utilities, ], ); } export { useProcessDMOperation, useProcessAndSendDMOperation, useSendComposableDMOperation, }; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index e4da368b1..1e9ae091e 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,411 +1,436 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { type AuthMetadata, IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; import type { NotificationsCreationData } from '../types/notif-types.js'; import { type OutboundP2PMessage, outboundP2PMessageStatuses, } from '../types/sqlite-types.js'; import { type EncryptedMessage, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getMessageForException } from '../utils/errors.js'; import { entries } from '../utils/objects.js'; import { olmSessionErrors } from '../utils/olm-utils.js'; type PeerToPeerContextType = { +processOutboundMessages: ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => void, +getDMOpsSendingPromise: () => { - +promise: Promise<$ReadOnlyArray>, + +promise: Promise, +dmOpID: string, }, +broadcastEphemeralMessage: ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => Promise, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; +export type ProcessOutboundP2PMessagesResult = + | { +result: 'success' } + | { + +result: 'failure', + +failedMessageIDs: $ReadOnlyArray, + }; + async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, peerOlmSessionsCreator: (userID: string, deviceID: string) => Promise, messageIDs: ?$ReadOnlyArray, -): Promise<$ReadOnlyArray> { +): Promise { let authMetadata; try { authMetadata = await identityContext.getAuthMetadata(); } catch (e) { - return []; + return { + result: 'failure', + failedMessageIDs: messageIDs ?? [], + }; } if ( !authMetadata.deviceID || !authMetadata.userID || !authMetadata.accessToken ) { - return []; + return { + result: 'failure', + failedMessageIDs: messageIDs ?? [], + }; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); const sentMessagesMap: { [messageID: string]: boolean } = {}; let messages; if (messageIDs) { messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); if (messageIDs.length !== messages.length) { const dbMessageIDsSet = new Set( messages.map(message => message.messageID), ); for (const messageID of messageIDs) { if (!dbMessageIDsSet.has(messageID)) { sentMessagesMap[messageID] = true; } } } } else { const allMessages = await sqliteAPI.getAllOutboundP2PMessages(); messages = allMessages.filter(message => message.supportsAutoRetry); } const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; for (const message: OutboundP2PMessage of messages) { if (!devicesMap[message.deviceID]) { devicesMap[message.deviceID] = [message]; } else { devicesMap[message.deviceID].push(message); } } const sendMessageToPeer = async ( message: OutboundP2PMessage, ): Promise => { if (!authMetadata.deviceID || !authMetadata.userID) { return; } try { const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo: { deviceID: authMetadata.deviceID, userID: authMetadata.userID, }, encryptedData: JSON.parse(message.ciphertext), }; await sendMessage( { deviceID: message.deviceID, payload: JSON.stringify(encryptedMessage), }, message.messageID, ); await sqliteAPI.markOutboundP2PMessageAsSent( message.messageID, message.deviceID, ); sentMessagesMap[message.messageID] = true; } catch (e) { console.error(e); } }; const devicePromises = entries(devicesMap).map( async ([peerDeviceID, deviceMessages]) => { for (const message of deviceMessages) { if (message.status === outboundP2PMessageStatuses.persisted) { try { const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (e) { if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) { console.log(`Error sending messages to peer ${peerDeviceID}`, e); break; } try { await peerOlmSessionsCreator(message.userID, peerDeviceID); const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (err) { console.log( `Error sending messages to peer ${peerDeviceID}`, err, ); break; } } } else if (message.status === outboundP2PMessageStatuses.encrypted) { await sendMessageToPeer(message); } else if (message.status === outboundP2PMessageStatuses.sent) { // Handle edge-case when message was sent, but it wasn't updated // in the message store. sentMessagesMap[message.messageID] = true; } } }, ); await Promise.all(devicePromises); - // Returning messageIDs of failed messages. - const sentMessages = new Set(Object.keys(sentMessagesMap)); - return messageIDs?.filter(id => !sentMessages.has(id)) ?? []; + + const sentMessagesSet = new Set(Object.keys(sentMessagesMap)); + const failedMessageIDs = + messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? []; + if (failedMessageIDs.length > 0) { + return { + result: 'failure', + failedMessageIDs, + }; + } + return { + result: 'success', + }; } const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; function PeerToPeerProvider(props: Props): React.Node { const { children } = props; const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const dmOpsSendingPromiseResolvers = React.useRef< Map< string, { - +resolve: (messageIDs: $ReadOnlyArray) => mixed, + +resolve: (result: ProcessOutboundP2PMessagesResult) => mixed, +reject: Error => mixed, }, >, >(new Map()); // This returns a promise that will be resolved with arrays of successfully // sent messages, so in case of failing all messages (e.g. no internet // connection) it will still resolve but with an empty array. const getDMOpsSendingPromise = React.useCallback(() => { const dmOpID = uuid.v4(); - const promise = new Promise<$ReadOnlyArray>((resolve, reject) => { - dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); - }); + const promise = new Promise( + (resolve, reject) => { + dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); + }, + ); return { promise, dmOpID }; }, []); const processingQueue = React.useRef< Array<{ +outboundMessageIDs: ?$ReadOnlyArray, +dmOpID: ?string, +notificationsCreationData: ?NotificationsCreationData, }>, >([]); const promiseRunning = React.useRef(false); const { createOlmSessionsWithPeer: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const sendPushNotifs = useSendPushNotifs(); const processOutboundMessages = React.useCallback( ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, notificationsCreationData: ?NotificationsCreationData, ) => { processingQueue.current.push({ outboundMessageIDs, dmOpID, notificationsCreationData, }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { const queueFront = processingQueue.current.shift(); try { - const [sentMessagesIDs] = await Promise.all([ + const [result] = await Promise.all([ processOutboundP2PMessages( sendMessageToDevice, identityContext, peerOlmSessionsCreator, queueFront?.outboundMessageIDs, ), sendPushNotifs(queueFront.notificationsCreationData), ]); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) - ?.resolve?.(sentMessagesIDs); + ?.resolve?.(result); } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.reject?.(e); } } finally { if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); } } } while (processingQueue.current.length > 0); promiseRunning.current = false; })(); } }, [ sendPushNotifs, peerOlmSessionsCreator, identityContext, sendMessageToDevice, ], ); const broadcastEphemeralMessage = React.useCallback( async ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => { const { userID: thisUserID, deviceID: thisDeviceID } = authMetadata; if (!thisDeviceID || !thisUserID) { throw new Error('No auth metadata'); } const { olmAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); // We want it distinct by device ID to avoid potentially creating // multiple Olm sessions with the same device simultaneously. const recipientsDistinctByDeviceID = [ ...new Map(recipients.map(item => [item.deviceID, item])).values(), ]; const senderInfo = { deviceID: thisDeviceID, userID: thisUserID }; const promises = recipientsDistinctByDeviceID.map(async recipient => { try { const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessageToDevice({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); } catch (e) { if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) { console.log( `Error sending messages to peer ${recipient.deviceID}`, e, ); return; } try { await peerOlmSessionsCreator(recipient.userID, recipient.deviceID); const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessageToDevice({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); } catch (err) { console.warn( `Error sending Olm-encrypted message to device ${recipient.deviceID}:`, err, ); } } }); await Promise.all(promises); }, [peerOlmSessionsCreator, sendMessageToDevice], ); React.useEffect(() => { const intervalID = setInterval( processOutboundMessages, AUTOMATIC_RETRY_FREQUENCY, ); return () => clearInterval(intervalID); }, [processOutboundMessages]); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, getDMOpsSendingPromise, broadcastEphemeralMessage, }), [ broadcastEphemeralMessage, processOutboundMessages, getDMOpsSendingPromise, ], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication };