diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js index ea8148b86..c14fee024 100644 --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -1,86 +1,90 @@ // @flow import * as React from 'react'; import { opsProcessingFinishedActionType } from '../actions/db-ops-actions.js'; import { usePeerToPeerCommunication } from '../tunnelbroker/peer-to-peer-context.js'; import { useTunnelbroker } from '../tunnelbroker/tunnelbroker-context.js'; import type { DBOpsEntry } from '../types/db-ops-types.js'; import { type MessageProcessed, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { useDispatch, useSelector } from '../utils/redux-utils.js'; function DBOpsHandler(): React.Node { const { sqliteAPI } = getConfig(); const { processDBStoreOperations } = sqliteAPI; const queueFront = useSelector(state => state.dbOpsStore.queuedOps[0]); const prevQueueFront = React.useRef(null); const { sendMessageToDevice } = useTunnelbroker(); const { processOutboundMessages } = usePeerToPeerCommunication(); const dispatch = useDispatch(); React.useEffect(() => { if (!queueFront || prevQueueFront.current === queueFront) { return; } prevQueueFront.current = queueFront; - const { ops, dispatchMetadata } = queueFront; + const { ops, dispatchMetadata, notificationsCreationData } = queueFront; void (async () => { if (ops) { await processDBStoreOperations(ops); if (ops.outboundP2PMessages && ops.outboundP2PMessages.length > 0) { const messageIDs = ops.outboundP2PMessages.map( message => message.messageID, ); - processOutboundMessages(messageIDs, dispatchMetadata?.dmOpID); + processOutboundMessages( + messageIDs, + dispatchMetadata?.dmOpID, + notificationsCreationData, + ); } } dispatch({ type: opsProcessingFinishedActionType, }); if (dispatchMetadata) { try { const deviceID = await getContentSigningKey(); const { messageID, senderDeviceID } = dispatchMetadata; if (!messageID || !senderDeviceID) { return; } const message: MessageProcessed = { type: peerToPeerMessageTypes.MESSAGE_PROCESSED, messageID, deviceID, }; await sendMessageToDevice({ deviceID: senderDeviceID, payload: JSON.stringify(message), }); await sqliteAPI.removeInboundP2PMessages([messageID]); } catch (e) { console.log( `Error while sending confirmation: ${ getMessageForException(e) ?? 'unknown error' }`, ); } } })(); }, [ queueFront, dispatch, processDBStoreOperations, sendMessageToDevice, sqliteAPI, processOutboundMessages, ]); return null; } export { DBOpsHandler }; diff --git a/lib/push/send-hooks.react.js b/lib/push/send-hooks.react.js index 7e5e73cd4..27c2f5cad 100644 --- a/lib/push/send-hooks.react.js +++ b/lib/push/send-hooks.react.js @@ -1,199 +1,202 @@ // @flow import * as React from 'react'; import uuid from 'uuid'; import { preparePushNotifs, type PerUserTargetedNotifications, } from './send-utils.js'; import { ENSCacheContext } from '../components/ens-cache-provider.react.js'; import { NeynarClientContext } from '../components/neynar-client-provider.react.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; import { thickRawThreadInfosSelector } from '../selectors/thread-selectors.js'; import { useTunnelbroker } from '../tunnelbroker/tunnelbroker-context.js'; import type { TargetedAPNsNotification, TargetedAndroidNotification, TargetedWebNotification, TargetedWNSNotification, NotificationsCreationData, } from '../types/notif-types.js'; import { deviceToTunnelbrokerMessageTypes } from '../types/tunnelbroker/messages.js'; import type { TunnelbrokerAPNsNotif, TunnelbrokerFCMNotif, TunnelbrokerWebPushNotif, TunnelbrokerWNSNotif, } from '../types/tunnelbroker/notif-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { useSelector } from '../utils/redux-utils.js'; function apnsNotifToTunnelbrokerAPNsNotif( targetedNotification: TargetedAPNsNotification, ): TunnelbrokerAPNsNotif { const { deliveryID: deviceID, notification: { headers, ...payload }, } = targetedNotification; const newHeaders = { ...headers, 'apns-push-type': 'Alert', }; return { type: deviceToTunnelbrokerMessageTypes.TUNNELBROKER_APNS_NOTIF, deviceID, headers: JSON.stringify(newHeaders), payload: JSON.stringify(payload), clientMessageID: uuid.v4(), }; } function androidNotifToTunnelbrokerFCMNotif( targetedNotification: TargetedAndroidNotification, ): TunnelbrokerFCMNotif { const { deliveryID: deviceID, notification: { data }, priority, } = targetedNotification; return { type: deviceToTunnelbrokerMessageTypes.TUNNELBROKER_FCM_NOTIF, deviceID, clientMessageID: uuid.v4(), data: JSON.stringify(data), priority: priority === 'normal' ? 'NORMAL' : 'HIGH', }; } function webNotifToTunnelbrokerWebPushNotif( targetedNotification: TargetedWebNotification, ): TunnelbrokerWebPushNotif { const { deliveryID: deviceID, notification } = targetedNotification; return { type: deviceToTunnelbrokerMessageTypes.TUNNELBROKER_WEB_PUSH_NOTIF, deviceID, clientMessageID: uuid.v4(), payload: JSON.stringify(notification), }; } function wnsNotifToTunnelbrokerWNSNofif( targetedNotification: TargetedWNSNotification, ): TunnelbrokerWNSNotif { const { deliveryID: deviceID, notification } = targetedNotification; return { type: deviceToTunnelbrokerMessageTypes.TUNNELBROKER_WNS_NOTIF, deviceID, clientMessageID: uuid.v4(), payload: JSON.stringify(notification), }; } function useSendPushNotifs(): ( - notifCreationData: NotificationsCreationData, + notifCreationData: ?NotificationsCreationData, ) => Promise { const rawMessageInfos = useSelector(state => state.messageStore.messages); const thickRawThreadInfos = useSelector(thickRawThreadInfosSelector); const auxUserInfos = useSelector(state => state.auxUserStore.auxUserInfos); const userInfos = useSelector(state => state.userStore.userInfos); const { getENSNames } = React.useContext(ENSCacheContext); const getFCNames = React.useContext(NeynarClientContext)?.getFCNames; const { createOlmSessionsWithPeer: olmSessionCreator } = usePeerOlmSessionsCreatorContext(); const { sendNotif } = useTunnelbroker(); const { encryptedNotifUtilsAPI } = getConfig(); return React.useCallback( - async (notifCreationData: NotificationsCreationData) => { + async (notifCreationData: ?NotificationsCreationData) => { + if (!notifCreationData) { + return; + } const deviceID = await getContentSigningKey(); const senderDeviceDescriptor = { senderDeviceID: deviceID }; const { messageDatas } = notifCreationData; const pushNotifsPreparationInput = { encryptedNotifUtilsAPI, senderDeviceDescriptor, olmSessionCreator, messageInfos: rawMessageInfos, thickRawThreadInfos, auxUserInfos, messageDatas, userInfos, getENSNames, getFCNames, }; const preparedPushNotifs = await preparePushNotifs( pushNotifsPreparationInput, ); if (!preparedPushNotifs) { return; } const sendPromises = []; for (const userID in preparedPushNotifs) { for (const notif of preparedPushNotifs[userID]) { if (notif.targetedNotification.notification.encryptionFailed) { continue; } let tunnelbrokerNotif; if (notif.platform === 'ios' || notif.platform === 'macos') { tunnelbrokerNotif = apnsNotifToTunnelbrokerAPNsNotif( notif.targetedNotification, ); } else if (notif.platform === 'android') { tunnelbrokerNotif = androidNotifToTunnelbrokerFCMNotif( notif.targetedNotification, ); } else if (notif.platform === 'web') { tunnelbrokerNotif = webNotifToTunnelbrokerWebPushNotif( notif.targetedNotification, ); } else if (notif.platform === 'windows') { tunnelbrokerNotif = wnsNotifToTunnelbrokerWNSNofif( notif.targetedNotification, ); } else { continue; } sendPromises.push( (async () => { try { await sendNotif(tunnelbrokerNotif); } catch (e) { console.log( `Failed to send notification to device: ${ tunnelbrokerNotif.deviceID }. Details: ${getMessageForException(e) ?? ''}`, ); } })(), ); } } await Promise.all(sendPromises); }, [ sendNotif, encryptedNotifUtilsAPI, olmSessionCreator, rawMessageInfos, thickRawThreadInfos, auxUserInfos, userInfos, getENSNames, getFCNames, ], ); } export { useSendPushNotifs }; diff --git a/lib/shared/dm-ops/process-dm-ops.js b/lib/shared/dm-ops/process-dm-ops.js index b58dcd8e3..161776fed 100644 --- a/lib/shared/dm-ops/process-dm-ops.js +++ b/lib/shared/dm-ops/process-dm-ops.js @@ -1,296 +1,305 @@ // @flow import _groupBy from 'lodash/fp/groupBy.js'; import * as React from 'react'; import uuid from 'uuid'; import { dmOpSpecs } from './dm-op-specs.js'; import type { OutboundDMOperationSpecification, DMOperationSpecification, } from './dm-op-utils.js'; import { createMessagesToPeersFromDMOp, dmOperationSpecificationTypes, } from './dm-op-utils.js'; import { useLoggedInUserInfo } from '../../hooks/account-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useDispatchWithMetadata } from '../../hooks/ops-hooks.js'; import { mergeUpdatesWithMessageInfos } from '../../reducers/message-reducer.js'; import { getAllPeerUserIDAndDeviceIDs } from '../../selectors/user-selectors.js'; import { usePeerToPeerCommunication } from '../../tunnelbroker/peer-to-peer-context.js'; import { processDMOpsActionType, queueDMOpsActionType, sendDMActionTypes, type SendDMOpsSuccessPayload, } from '../../types/dm-ops.js'; import type { LocalMessageInfo } from '../../types/message-types.js'; import type { RawThreadInfo } from '../../types/minimally-encoded-thread-permissions-types.js'; import type { DispatchMetadata } from '../../types/redux-types.js'; import type { OutboundP2PMessage } from '../../types/sqlite-types.js'; import { threadTypes } from '../../types/thread-types-enum.js'; import type { LegacyRawThreadInfo } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import { useDispatchActionPromise } from '../../utils/redux-promise-utils.js'; import { useSelector } from '../../utils/redux-utils.js'; import { messageSpecs } from '../messages/message-specs.js'; import { updateSpecs } from '../updates/update-specs.js'; function useProcessDMOperation(): ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => Promise { const fetchMessage = useGetLatestMessageEdit(); const threadInfos = useSelector(state => state.threadStore.threadInfos); const utilities = React.useMemo( () => ({ fetchMessage, threadInfos, }), [fetchMessage, threadInfos], ); const dispatchWithMetadata = useDispatchWithMetadata(); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const currentUserInfo = useSelector(state => state.currentUserInfo); return React.useCallback( async ( dmOperationSpecification: DMOperationSpecification, dmOpID: ?string, ) => { if (!viewerID) { console.log('ignored DMOperation because logged out'); return; } let outboundP2PMessages: ?$ReadOnlyArray = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND ) { outboundP2PMessages = await createMessagesToPeersFromDMOp( dmOperationSpecification, allPeerUserIDAndDeviceIDs, currentUserInfo, ); } let dispatchMetadata: ?DispatchMetadata = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && dmOpID ) { dispatchMetadata = { dmOpID, }; } else if ( dmOperationSpecification.type === dmOperationSpecificationTypes.INBOUND ) { dispatchMetadata = dmOperationSpecification.metadata; } const { op: dmOp } = dmOperationSpecification; const processingCheckResult = dmOpSpecs[dmOp.type].canBeProcessed( dmOp, viewerID, utilities, ); if (!processingCheckResult.isProcessingPossible) { if (processingCheckResult.reason.type === 'missing_thread') { dispatchWithMetadata( { type: queueDMOpsActionType, payload: { operation: dmOp, threadID: processingCheckResult.reason.threadID, timestamp: Date.now(), }, }, dispatchMetadata, ); } return; } - const { rawMessageInfos, updateInfos } = await dmOpSpecs[ - dmOp.type - ].processDMOperation(dmOp, viewerID, utilities); + + const dmOpSpec = dmOpSpecs[dmOp.type]; + const notificationsCreationDataPromise = (async () => { + return await dmOpSpec.notificationsCreationData?.(dmOp, utilities); + })(); + + const [{ rawMessageInfos, updateInfos }, notificationsCreationData] = + await Promise.all([ + dmOpSpec.processDMOperation(dmOp, viewerID, utilities), + notificationsCreationDataPromise, + ]); const { rawMessageInfos: allNewMessageInfos } = mergeUpdatesWithMessageInfos(rawMessageInfos, updateInfos); const messagesByThreadID = _groupBy(message => message.threadID)( allNewMessageInfos, ); const updatedThreadInfosByThreadID: { [string]: RawThreadInfo | LegacyRawThreadInfo, } = {}; for (const threadID in messagesByThreadID) { updatedThreadInfosByThreadID[threadID] = threadInfos[threadID]; } for (const update of updateInfos) { const updatedThreadInfo = updateSpecs[ update.type ].getUpdatedThreadInfo?.(update, updatedThreadInfosByThreadID); if ( updatedThreadInfo && updatedThreadInfo?.type === threadTypes.THICK_SIDEBAR ) { updatedThreadInfosByThreadID[updatedThreadInfo.id] = updatedThreadInfo; } } for (const threadID in messagesByThreadID) { const repliesCountIncreasingMessages = messagesByThreadID[ threadID ].filter(message => messageSpecs[message.type].includedInRepliesCount); if (repliesCountIncreasingMessages.length > 0) { const threadInfo = updatedThreadInfosByThreadID[threadID]; const repliesCountIncreaseTime = Math.max( repliesCountIncreasingMessages.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time: repliesCountIncreaseTime, threadInfo: { ...threadInfo, repliesCount: threadInfo.repliesCount + repliesCountIncreasingMessages.length, }, }); } const messagesFromOtherPeers = messagesByThreadID[threadID].filter( message => message.creatorID !== viewerID, ); if (messagesFromOtherPeers.length === 0) { continue; } // We take the most recent timestamp to make sure that updates older // than it won't flip the status to read. const time = Math.max( messagesFromOtherPeers.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: uuid.v4(), time, threadID, unread: true, }); } let messageIDWithoutAutoRetry: ?string = null; if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && !dmOpSpecs[dmOperationSpecification.op.type].supportsAutoRetry ) { messageIDWithoutAutoRetry = dmOperationSpecification.op.messageID; } dispatchWithMetadata( { type: processDMOpsActionType, payload: { rawMessageInfos, updateInfos, outboundP2PMessages, messageIDWithoutAutoRetry, + notificationsCreationData, }, }, dispatchMetadata, ); }, [ viewerID, utilities, dispatchWithMetadata, allPeerUserIDAndDeviceIDs, currentUserInfo, threadInfos, ], ); } function useProcessAndSendDMOperation(): ( dmOperationSpecification: OutboundDMOperationSpecification, ) => Promise { const processDMOps = useProcessDMOperation(); const dispatchActionPromise = useDispatchActionPromise(); const { getDMOpsSendingPromise } = usePeerToPeerCommunication(); return React.useCallback( async (dmOperationSpecification: OutboundDMOperationSpecification) => { const { promise, dmOpID } = getDMOpsSendingPromise(); await processDMOps(dmOperationSpecification, dmOpID); if ( dmOperationSpecification.type === dmOperationSpecificationTypes.OUTBOUND && !dmOpSpecs[dmOperationSpecification.op.type].supportsAutoRetry && dmOperationSpecification.op.messageID ) { const messageID: string = dmOperationSpecification.op.messageID; const sendingPromise: Promise = (async () => { const outboundP2PMessageIDs = await promise; return { messageID, outboundP2PMessageIDs, }; })(); void dispatchActionPromise( sendDMActionTypes, sendingPromise, undefined, { messageID, }, ); } }, [dispatchActionPromise, getDMOpsSendingPromise, processDMOps], ); } function useRetrySendDMOperation(): ( messageID: string, localMessageInfo: LocalMessageInfo, ) => Promise { const { processOutboundMessages, getDMOpsSendingPromise } = usePeerToPeerCommunication(); const dispatchActionPromise = useDispatchActionPromise(); return React.useCallback( async (messageID: string, localMessageInfo: LocalMessageInfo) => { const { promise, dmOpID } = getDMOpsSendingPromise(); processOutboundMessages(localMessageInfo.outboundP2PMessageIDs, dmOpID); const sendingPromise: Promise = (async () => { const outboundP2PMessageIDs = await promise; return { messageID, outboundP2PMessageIDs, }; })(); void dispatchActionPromise(sendDMActionTypes, sendingPromise, undefined, { messageID, }); }, [dispatchActionPromise, getDMOpsSendingPromise, processOutboundMessages], ); } export { useProcessDMOperation, useProcessAndSendDMOperation, useRetrySendDMOperation, }; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index 27533b2bf..10008936e 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,369 +1,390 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; +import { useSendPushNotifs } from '../push/send-hooks.react.js'; import { type AuthMetadata, IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; +import type { NotificationsCreationData } from '../types/notif-types.js'; import { type OutboundP2PMessage, outboundP2PMessageStatuses, } from '../types/sqlite-types.js'; import { type EncryptedMessage, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getMessageForException } from '../utils/errors.js'; import { olmSessionErrors } from '../utils/olm-utils.js'; type PeerToPeerContextType = { +processOutboundMessages: ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, + notificationsCreationData: ?NotificationsCreationData, ) => void, +getDMOpsSendingPromise: () => { +promise: Promise<$ReadOnlyArray>, +dmOpID: string, }, +broadcastEphemeralMessage: ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => Promise, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, peerOlmSessionsCreator: (userID: string, deviceID: string) => Promise, messageIDs: ?$ReadOnlyArray, ): Promise<$ReadOnlyArray> { let authMetadata; try { authMetadata = await identityContext.getAuthMetadata(); } catch (e) { return []; } if ( !authMetadata.deviceID || !authMetadata.userID || !authMetadata.accessToken ) { return []; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); let messages; if (messageIDs) { messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); } else { const allMessages = await sqliteAPI.getAllOutboundP2PMessages(); messages = allMessages.filter(message => message.supportsAutoRetry); } const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; for (const message: OutboundP2PMessage of messages) { if (!devicesMap[message.deviceID]) { devicesMap[message.deviceID] = [message]; } else { devicesMap[message.deviceID].push(message); } } const sentMessagesMap: { [messageID: string]: boolean } = {}; const sendMessageToPeer = async ( message: OutboundP2PMessage, ): Promise => { if (!authMetadata.deviceID || !authMetadata.userID) { return; } try { const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo: { deviceID: authMetadata.deviceID, userID: authMetadata.userID, }, encryptedData: JSON.parse(message.ciphertext), }; await sendMessage( { deviceID: message.deviceID, payload: JSON.stringify(encryptedMessage), }, message.messageID, ); await sqliteAPI.markOutboundP2PMessageAsSent( message.messageID, message.deviceID, ); sentMessagesMap[message.messageID] = true; } catch (e) { console.error(e); } }; for (const peerDeviceID in devicesMap) { for (const message of devicesMap[peerDeviceID]) { if (message.status === outboundP2PMessageStatuses.persisted) { try { const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (e) { if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { console.log(`Error sending messages to peer ${peerDeviceID}`, e); break; } try { await peerOlmSessionsCreator(message.userID, peerDeviceID); const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (err) { console.log(`Error sending messages to peer ${peerDeviceID}`, err); break; } } } else if (message.status === outboundP2PMessageStatuses.encrypted) { await sendMessageToPeer(message); } else if (message.status === outboundP2PMessageStatuses.sent) { // Handle edge-case when message was sent, but it wasn't updated // in the message store. sentMessagesMap[message.messageID] = true; } } } return Object.keys(sentMessagesMap); } const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; function PeerToPeerProvider(props: Props): React.Node { const { children } = props; const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const dmOpsSendingPromiseResolvers = React.useRef< Map< string, { +resolve: (messageIDs: $ReadOnlyArray) => mixed, +reject: Error => mixed, }, >, >(new Map()); // This returns a promise that will be resolved with arrays of successfully // sent messages, so in case of failing all messages (e.g. no internet // connection) it will still resolve but with an empty array. const getDMOpsSendingPromise = React.useCallback(() => { const dmOpID = uuid.v4(); const promise = new Promise<$ReadOnlyArray>((resolve, reject) => { dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); }); return { promise, dmOpID }; }, []); const processingQueue = React.useRef< Array<{ +outboundMessageIDs: ?$ReadOnlyArray, +dmOpID: ?string, + +notificationsCreationData: ?NotificationsCreationData, }>, >([]); const promiseRunning = React.useRef(false); const { createOlmSessionsWithPeer: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); + const sendPushNotifs = useSendPushNotifs(); const processOutboundMessages = React.useCallback( - (outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string) => { - processingQueue.current.push({ outboundMessageIDs, dmOpID }); + ( + outboundMessageIDs: ?$ReadOnlyArray, + dmOpID: ?string, + notificationsCreationData: ?NotificationsCreationData, + ) => { + processingQueue.current.push({ + outboundMessageIDs, + dmOpID, + notificationsCreationData, + }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { const queueFront = processingQueue.current.shift(); try { - const sentMessagesIDs = await processOutboundP2PMessages( - sendMessageToDevice, - identityContext, - peerOlmSessionsCreator, - queueFront?.outboundMessageIDs, - ); + const [sentMessagesIDs] = await Promise.all([ + processOutboundP2PMessages( + sendMessageToDevice, + identityContext, + peerOlmSessionsCreator, + queueFront?.outboundMessageIDs, + ), + sendPushNotifs(notificationsCreationData), + ]); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.resolve?.(sentMessagesIDs); } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.reject?.(e); } } finally { if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); } } } while (processingQueue.current.length > 0); promiseRunning.current = false; })(); } }, - [peerOlmSessionsCreator, identityContext, sendMessageToDevice], + [ + sendPushNotifs, + peerOlmSessionsCreator, + identityContext, + sendMessageToDevice, + ], ); const broadcastEphemeralMessage = React.useCallback( async ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => { const { userID: thisUserID, deviceID: thisDeviceID } = authMetadata; if (!thisDeviceID || !thisUserID) { throw new Error('No auth metadata'); } const { olmAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); // We want it distinct by device ID to avoid potentially creating // multiple Olm sessions with the same device simultaneously. const recipientsDistinctByDeviceID = [ ...new Map(recipients.map(item => [item.deviceID, item])).values(), ]; const senderInfo = { deviceID: thisDeviceID, userID: thisUserID }; const promises = recipientsDistinctByDeviceID.map(async recipient => { try { const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessageToDevice({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); } catch (e) { if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { console.log( `Error sending messages to peer ${recipient.deviceID}`, e, ); return; } try { await peerOlmSessionsCreator(recipient.userID, recipient.deviceID); const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessageToDevice({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); } catch (err) { console.warn( `Error sending Olm-encrypted message to device ${recipient.deviceID}:`, err, ); } } }); await Promise.all(promises); }, [peerOlmSessionsCreator, sendMessageToDevice], ); React.useEffect(() => { const intervalID = setInterval( processOutboundMessages, AUTOMATIC_RETRY_FREQUENCY, ); return () => clearInterval(intervalID); }, [processOutboundMessages]); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, getDMOpsSendingPromise, broadcastEphemeralMessage, }), [ broadcastEphemeralMessage, processOutboundMessages, getDMOpsSendingPromise, ], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication };