diff --git a/lib/shared/dm-ops/dm-op-utils.js b/lib/shared/dm-ops/dm-op-utils.js index bc7a21c2b..d3a03b7a7 100644 --- a/lib/shared/dm-ops/dm-op-utils.js +++ b/lib/shared/dm-ops/dm-op-utils.js @@ -1,90 +1,111 @@ // @flow import uuid from 'uuid'; +import type { MessageSourceMetadata } from '../../types/db-ops-types.js'; import type { DMOperation } from '../../types/dm-ops.js'; import { outboundP2PMessageStatuses, type OutboundP2PMessage, } from '../../types/sqlite-types.js'; import { type DMOperationP2PMessage, userActionsP2PMessageTypes, } from '../../types/tunnelbroker/user-actions-peer-to-peer-message-types.js'; import type { CurrentUserInfo } from '../../types/user-types.js'; import { getContentSigningKey } from '../../utils/crypto-utils.js'; function generateMessagesToPeers( message: DMOperation, peers: $ReadOnlyArray<{ +userID: string, +deviceID: string, }>, supportsAutoRetry: boolean, ): $ReadOnlyArray { const opMessage: DMOperationP2PMessage = { type: userActionsP2PMessageTypes.DM_OPERATION, op: message, }; const plaintext = JSON.stringify(opMessage); const outboundP2PMessages = []; for (const peer of peers) { const messageToPeer: OutboundP2PMessage = { messageID: uuid.v4(), deviceID: peer.deviceID, userID: peer.userID, timestamp: new Date().getTime().toString(), plaintext, ciphertext: '', status: outboundP2PMessageStatuses.persisted, supportsAutoRetry, }; outboundP2PMessages.push(messageToPeer); } return outboundP2PMessages; } -export type DMOperationSpecification = { +export const dmOperationSpecificationTypes = Object.freeze({ + OUTBOUND: 'OutboundDMOperationSpecification', + INBOUND: 'InboundDMOperationSpecification', +}); + +// The operation generated on the sending client, causes changes to +// the state and broadcasting information to peers. +export type OutboundDMOperationSpecification = { + +type: 'OutboundDMOperationSpecification', +op: DMOperation, +supportsAutoRetry: boolean, +recipients: | { +type: 'all_peer_devices' | 'self_devices' } | { +type: 'some_users', +userIDs: $ReadOnlyArray }, }; +// The operation received from other peers, causes changes to +// the state and after processing, sends confirmation to the sender. +export type InboundDMOperationSpecification = { + +type: 'InboundDMOperationSpecification', + +op: DMOperation, + +metadata: ?MessageSourceMetadata, +}; + +export type DMOperationSpecification = + | OutboundDMOperationSpecification + | InboundDMOperationSpecification; + async function createMessagesToPeersFromDMOp( - operation: DMOperationSpecification, + operation: OutboundDMOperationSpecification, allPeerUserIDAndDeviceIDs: $ReadOnlyArray<{ +userID: string, +deviceID: string, }>, currentUserInfo: ?CurrentUserInfo, ): Promise<$ReadOnlyArray> { if (!currentUserInfo?.id) { return []; } let peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs; if (operation.recipients.type === 'self_devices') { peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter( peer => peer.userID === currentUserInfo.id, ); } else if (operation.recipients.type === 'some_users') { const userIDs = new Set(operation.recipients.userIDs); peerUserIDAndDeviceIDs = allPeerUserIDAndDeviceIDs.filter(peer => userIDs.has(peer.userID), ); } const thisDeviceID = await getContentSigningKey(); const targetPeers = peerUserIDAndDeviceIDs.filter( peer => peer.deviceID !== thisDeviceID, ); return generateMessagesToPeers( operation.op, targetPeers, operation.supportsAutoRetry, ); } export { createMessagesToPeersFromDMOp }; diff --git a/lib/shared/dm-ops/process-dm-ops.js b/lib/shared/dm-ops/process-dm-ops.js index ac8b7ec5d..6589d5443 100644 --- a/lib/shared/dm-ops/process-dm-ops.js +++ b/lib/shared/dm-ops/process-dm-ops.js @@ -1,173 +1,173 @@ // @flow import _groupBy from 'lodash/fp/groupBy.js'; import * as React from 'react'; import uuid from 'uuid'; import { dmOpSpecs } from './dm-op-specs.js'; -import type { DMOperationSpecification } from './dm-op-utils.js'; +import type { OutboundDMOperationSpecification } from './dm-op-utils.js'; import { useLoggedInUserInfo } from '../../hooks/account-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useDispatchWithMessageSource } from '../../hooks/ops-hooks.js'; import { mergeUpdatesWithMessageInfos } from '../../reducers/message-reducer.js'; import { usePeerToPeerCommunication } from '../../tunnelbroker/peer-to-peer-context.js'; import type { MessageSourceMetadata } from '../../types/db-ops-types.js'; import { type DMOperation, processDMOpsActionType, queueDMOpsActionType, } from '../../types/dm-ops.js'; import type { RawThreadInfo } from '../../types/minimally-encoded-thread-permissions-types.js'; import { threadTypes } from '../../types/thread-types-enum.js'; import type { LegacyRawThreadInfo } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import { useSelector } from '../../utils/redux-utils.js'; import { messageSpecs } from '../messages/message-specs.js'; import { updateSpecs } from '../updates/update-specs.js'; function useProcessDMOperation(): ( dmOp: DMOperation, metadata: ?MessageSourceMetadata, ) => Promise { const fetchMessage = useGetLatestMessageEdit(); const threadInfos = useSelector(state => state.threadStore.threadInfos); const utilities = React.useMemo( () => ({ fetchMessage, threadInfos, }), [fetchMessage, threadInfos], ); const dispatchWithMessageSource = useDispatchWithMessageSource(); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; return React.useCallback( async (dmOp: DMOperation, metadata: ?MessageSourceMetadata) => { if (!viewerID) { console.log('ignored DMOperation because logged out'); return; } const processingCheckResult = dmOpSpecs[dmOp.type].canBeProcessed( dmOp, viewerID, utilities, ); if (!processingCheckResult.isProcessingPossible) { if (processingCheckResult.reason.type === 'missing_thread') { dispatchWithMessageSource( { type: queueDMOpsActionType, payload: { operation: dmOp, threadID: processingCheckResult.reason.threadID, timestamp: Date.now(), }, }, metadata, ); } return; } const { rawMessageInfos, updateInfos } = await dmOpSpecs[ dmOp.type ].processDMOperation(dmOp, viewerID, utilities); const { rawMessageInfos: allNewMessageInfos } = mergeUpdatesWithMessageInfos(rawMessageInfos, updateInfos); const messagesByThreadID = _groupBy(message => message.threadID)( allNewMessageInfos, ); const updatedThreadInfosByThreadID: { [string]: RawThreadInfo | LegacyRawThreadInfo, } = {}; for (const threadID in messagesByThreadID) { updatedThreadInfosByThreadID[threadID] = threadInfos[threadID]; } for (const update of updateInfos) { const updatedThreadInfo = updateSpecs[ update.type ].getUpdatedThreadInfo?.(update, updatedThreadInfosByThreadID); if ( updatedThreadInfo && updatedThreadInfo?.type === threadTypes.THICK_SIDEBAR ) { updatedThreadInfosByThreadID[updatedThreadInfo.id] = updatedThreadInfo; } } for (const threadID in messagesByThreadID) { const repliesCountIncreasingMessages = messagesByThreadID[ threadID ].filter(message => messageSpecs[message.type].includedInRepliesCount); if (repliesCountIncreasingMessages.length > 0) { const threadInfo = updatedThreadInfosByThreadID[threadID]; const repliesCountIncreaseTime = Math.max( repliesCountIncreasingMessages.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time: repliesCountIncreaseTime, threadInfo: { ...threadInfo, repliesCount: threadInfo.repliesCount + repliesCountIncreasingMessages.length, }, }); } const messagesFromOtherPeers = messagesByThreadID[threadID].filter( message => message.creatorID !== viewerID, ); if (messagesFromOtherPeers.length === 0) { continue; } // We take the most recent timestamp to make sure that updates older // than it won't flip the status to read. const time = Math.max( messagesFromOtherPeers.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: uuid.v4(), time, threadID, unread: true, }); } dispatchWithMessageSource( { type: processDMOpsActionType, payload: { rawMessageInfos, updateInfos, }, }, metadata, ); }, [viewerID, utilities, dispatchWithMessageSource, threadInfos], ); } function useProcessAndSendDMOperation(): ( - dmOperationSpecification: DMOperationSpecification, + dmOperationSpecification: OutboundDMOperationSpecification, ) => Promise { const processDMOps = useProcessDMOperation(); const { sendDMOperation } = usePeerToPeerCommunication(); return React.useCallback( - async (dmOperationSpecification: DMOperationSpecification) => { + async (dmOperationSpecification: OutboundDMOperationSpecification) => { await processDMOps(dmOperationSpecification.op); await sendDMOperation(dmOperationSpecification); }, [processDMOps, sendDMOperation], ); } export { useProcessDMOperation, useProcessAndSendDMOperation }; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js index 1bdd50ff5..cc6d62b01 100644 --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -1,362 +1,362 @@ // @flow import invariant from 'invariant'; import * as React from 'react'; import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; import { getAllPeerUserIDAndDeviceIDs } from '../selectors/user-selectors.js'; import { createMessagesToPeersFromDMOp, - type DMOperationSpecification, + type OutboundDMOperationSpecification, } from '../shared/dm-ops/dm-op-utils.js'; import { type AuthMetadata, IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; import { scheduleP2PMessagesActionType } from '../types/dm-ops.js'; import { type OutboundP2PMessage, outboundP2PMessageStatuses, } from '../types/sqlite-types.js'; import { type EncryptedMessage, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getMessageForException } from '../utils/errors.js'; import { olmSessionErrors } from '../utils/olm-utils.js'; import { useDispatch, useSelector } from '../utils/redux-utils.js'; type PeerToPeerContextType = { +processOutboundMessages: ( outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string, ) => void, - +sendDMOperation: (op: DMOperationSpecification) => Promise, + +sendDMOperation: (op: OutboundDMOperationSpecification) => Promise, +broadcastEphemeralMessage: ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => Promise, }; const PeerToPeerContext: React.Context = React.createContext(); type Props = { +children: React.Node, }; async function processOutboundP2PMessages( sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, identityContext: IdentityClientContextType, peerOlmSessionsCreator: (userID: string, deviceID: string) => Promise, messageIDs: ?$ReadOnlyArray, ): Promise { const authMetadata = await identityContext.getAuthMetadata(); if ( !authMetadata.deviceID || !authMetadata.userID || !authMetadata.accessToken ) { return; } const { olmAPI, sqliteAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); let messages; if (messageIDs) { messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs); } else { const allMessages = await sqliteAPI.getAllOutboundP2PMessages(); messages = allMessages.filter(message => message.supportsAutoRetry); } const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; for (const message: OutboundP2PMessage of messages) { if (!devicesMap[message.deviceID]) { devicesMap[message.deviceID] = [message]; } else { devicesMap[message.deviceID].push(message); } } const sendMessageToPeer = async ( message: OutboundP2PMessage, ): Promise => { if (!authMetadata.deviceID || !authMetadata.userID) { return; } const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo: { deviceID: authMetadata.deviceID, userID: authMetadata.userID, }, encryptedData: JSON.parse(message.ciphertext), }; await sendMessage( { deviceID: message.deviceID, payload: JSON.stringify(encryptedMessage), }, message.messageID, ); await sqliteAPI.markOutboundP2PMessageAsSent( message.messageID, message.deviceID, ); }; for (const peerDeviceID in devicesMap) { for (const message of devicesMap[peerDeviceID]) { if (message.status === outboundP2PMessageStatuses.persisted) { try { const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (e) { if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { console.log(`Error sending messages to peer ${peerDeviceID}`, e); break; } try { await peerOlmSessionsCreator(message.userID, peerDeviceID); const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; await sendMessageToPeer(encryptedMessage); } catch (err) { console.log(`Error sending messages to peer ${peerDeviceID}`, err); break; } } } else if (message.status === outboundP2PMessageStatuses.encrypted) { await sendMessageToPeer(message); } } } } const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000; function PeerToPeerProvider(props: Props): React.Node { const { children } = props; const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const dispatch = useDispatch(); const dmOpsSendingPromiseResolvers = React.useRef< Map mixed, +reject: Error => mixed }>, >(new Map()); const allPeerUserIDAndDeviceIDs = useSelector(getAllPeerUserIDAndDeviceIDs); const currentUserInfo = useSelector(state => state.currentUserInfo); const sendDMOperation = React.useCallback( - async (op: DMOperationSpecification) => { + async (op: OutboundDMOperationSpecification) => { const dmOpID = uuid.v4(); const promise = new Promise((resolve, reject) => { dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); }); const messages = await createMessagesToPeersFromDMOp( op, allPeerUserIDAndDeviceIDs, currentUserInfo, ); dispatch({ type: scheduleP2PMessagesActionType, payload: { dmOpID, messages, }, }); return promise; }, [allPeerUserIDAndDeviceIDs, currentUserInfo, dispatch], ); const processingQueue = React.useRef< Array<{ +outboundMessageIDs: ?$ReadOnlyArray, +dmOpID: ?string, }>, >([]); const promiseRunning = React.useRef(false); const { createOlmSessionsWithPeer: peerOlmSessionsCreator } = usePeerOlmSessionsCreatorContext(); const processOutboundMessages = React.useCallback( (outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string) => { processingQueue.current.push({ outboundMessageIDs, dmOpID }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { const queueFront = processingQueue.current.shift(); try { await processOutboundP2PMessages( sendMessageToDevice, identityContext, peerOlmSessionsCreator, queueFront?.outboundMessageIDs, ); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.resolve?.(); } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current .get(queueFront.dmOpID) ?.reject?.(e); } } finally { if (queueFront.dmOpID) { dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); } } } while (processingQueue.current.length > 0); promiseRunning.current = false; })(); } }, [peerOlmSessionsCreator, identityContext, sendMessageToDevice], ); const broadcastEphemeralMessage = React.useCallback( async ( contentPayload: string, recipients: $ReadOnlyArray<{ +userID: string, +deviceID: string }>, authMetadata: AuthMetadata, ) => { const { userID: thisUserID, deviceID: thisDeviceID } = authMetadata; if (!thisDeviceID || !thisUserID) { throw new Error('No auth metadata'); } const { olmAPI } = getConfig(); await olmAPI.initializeCryptoAccount(); // We want it distinct by device ID to avoid potentially creating // multiple Olm sessions with the same device simultaneously. const recipientsDistinctByDeviceID = [ ...new Map(recipients.map(item => [item.deviceID, item])).values(), ]; const senderInfo = { deviceID: thisDeviceID, userID: thisUserID }; const promises = recipientsDistinctByDeviceID.map(async recipient => { try { const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessageToDevice({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); } catch (e) { if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { console.log( `Error sending messages to peer ${recipient.deviceID}`, e, ); return; } try { await peerOlmSessionsCreator(recipient.userID, recipient.deviceID); const encryptedData = await olmAPI.encrypt( contentPayload, recipient.deviceID, ); const encryptedMessage: EncryptedMessage = { type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, senderInfo, encryptedData, }; await sendMessageToDevice({ deviceID: recipient.deviceID, payload: JSON.stringify(encryptedMessage), }); } catch (err) { console.warn( `Error sending Olm-encrypted message to device ${recipient.deviceID}:`, err, ); } } }); await Promise.all(promises); }, [peerOlmSessionsCreator, sendMessageToDevice], ); React.useEffect(() => { const intervalID = setInterval( processOutboundMessages, AUTOMATIC_RETRY_FREQUENCY, ); return () => clearInterval(intervalID); }, [processOutboundMessages]); const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, sendDMOperation, broadcastEphemeralMessage, }), [broadcastEphemeralMessage, processOutboundMessages, sendDMOperation], ); return ( {children} ); } function usePeerToPeerCommunication(): PeerToPeerContextType { const context = React.useContext(PeerToPeerContext); invariant(context, 'PeerToPeerContext not found'); return context; } export { PeerToPeerProvider, usePeerToPeerCommunication };