diff --git a/lib/shared/dm-ops/dm-ops-queue-handler.react.js b/lib/shared/dm-ops/dm-ops-queue-handler.react.js index 784c5efaf..b3d1834b2 100644 --- a/lib/shared/dm-ops/dm-ops-queue-handler.react.js +++ b/lib/shared/dm-ops/dm-ops-queue-handler.react.js @@ -1,77 +1,87 @@ // @flow import * as React from 'react'; +import { dmOperationSpecificationTypes } from './dm-op-utils.js'; import { useProcessDMOperation } from './process-dm-ops.js'; import { threadInfoSelector } from '../../selectors/thread-selectors.js'; import { clearQueuedThreadDMOpsActionType, pruneDMOpsQueueActionType, } from '../../types/dm-ops.js'; import { useDispatch, useSelector } from '../../utils/redux-utils.js'; const PRUNING_FREQUENCY = 60 * 60 * 1000; const FIRST_PRUNING_DELAY = 10 * 60 * 1000; const QUEUED_OPERATION_TTL = 3 * 24 * 60 * 60 * 1000; function DMOpsQueueHandler(): React.Node { const dispatch = useDispatch(); const prune = React.useCallback(() => { const now = Date.now(); dispatch({ type: pruneDMOpsQueueActionType, payload: { pruneMaxTimestamp: now - QUEUED_OPERATION_TTL, }, }); }, [dispatch]); React.useEffect(() => { const timeoutID = setTimeout(prune, FIRST_PRUNING_DELAY); const intervalID = setInterval(prune, PRUNING_FREQUENCY); return () => { clearTimeout(timeoutID); clearInterval(intervalID); }; }, [prune]); const threadInfos = useSelector(threadInfoSelector); const threadIDs = React.useMemo( () => new Set(Object.keys(threadInfos)), [threadInfos], ); const prevThreadIDsRef = React.useRef<$ReadOnlySet>(new Set()); const queuedOperations = useSelector( state => state.queuedDMOperations.operations, ); const processDMOperation = useProcessDMOperation(); React.useEffect(() => { const prevThreadIDs = prevThreadIDsRef.current; prevThreadIDsRef.current = threadIDs; for (const threadID in queuedOperations) { if (!threadIDs.has(threadID) || prevThreadIDs.has(threadID)) { continue; } for (const dmOp of queuedOperations[threadID]) { - void processDMOperation(dmOp.operation); + void processDMOperation({ + // This is `INBOUND` because we assume that when generating + // `dmOperationSpecificationTypes.OUBOUND` it should be possible + // to be processed and never queued up. + type: dmOperationSpecificationTypes.INBOUND, + op: dmOp.operation, + // There is no metadata, because messages were confirmed when + // adding to the queue. + metadata: null, + }); } dispatch({ type: clearQueuedThreadDMOpsActionType, payload: { threadID, }, }); } }, [dispatch, processDMOperation, queuedOperations, threadIDs]); return null; } export { DMOpsQueueHandler }; diff --git a/lib/shared/dm-ops/process-dm-ops.js b/lib/shared/dm-ops/process-dm-ops.js index 6589d5443..66aa77542 100644 --- a/lib/shared/dm-ops/process-dm-ops.js +++ b/lib/shared/dm-ops/process-dm-ops.js @@ -1,173 +1,175 @@ // @flow import _groupBy from 'lodash/fp/groupBy.js'; import * as React from 'react'; import uuid from 'uuid'; import { dmOpSpecs } from './dm-op-specs.js'; -import type { OutboundDMOperationSpecification } from './dm-op-utils.js'; +import type { + OutboundDMOperationSpecification, + DMOperationSpecification, +} from './dm-op-utils.js'; import { useLoggedInUserInfo } from '../../hooks/account-hooks.js'; import { useGetLatestMessageEdit } from '../../hooks/latest-message-edit.js'; import { useDispatchWithMessageSource } from '../../hooks/ops-hooks.js'; import { mergeUpdatesWithMessageInfos } from '../../reducers/message-reducer.js'; import { usePeerToPeerCommunication } from '../../tunnelbroker/peer-to-peer-context.js'; -import type { MessageSourceMetadata } from '../../types/db-ops-types.js'; import { - type DMOperation, processDMOpsActionType, queueDMOpsActionType, } from '../../types/dm-ops.js'; import type { RawThreadInfo } from '../../types/minimally-encoded-thread-permissions-types.js'; import { threadTypes } from '../../types/thread-types-enum.js'; import type { LegacyRawThreadInfo } from '../../types/thread-types.js'; import { updateTypes } from '../../types/update-types-enum.js'; import { useSelector } from '../../utils/redux-utils.js'; import { messageSpecs } from '../messages/message-specs.js'; import { updateSpecs } from '../updates/update-specs.js'; function useProcessDMOperation(): ( - dmOp: DMOperation, - metadata: ?MessageSourceMetadata, + dmOperationSpecification: DMOperationSpecification, ) => Promise { const fetchMessage = useGetLatestMessageEdit(); const threadInfos = useSelector(state => state.threadStore.threadInfos); const utilities = React.useMemo( () => ({ fetchMessage, threadInfos, }), [fetchMessage, threadInfos], ); const dispatchWithMessageSource = useDispatchWithMessageSource(); const loggedInUserInfo = useLoggedInUserInfo(); const viewerID = loggedInUserInfo?.id; return React.useCallback( - async (dmOp: DMOperation, metadata: ?MessageSourceMetadata) => { + async (dmOperationSpecification: DMOperationSpecification) => { if (!viewerID) { console.log('ignored DMOperation because logged out'); return; } + + const { op: dmOp, metadata } = dmOperationSpecification; const processingCheckResult = dmOpSpecs[dmOp.type].canBeProcessed( dmOp, viewerID, utilities, ); if (!processingCheckResult.isProcessingPossible) { if (processingCheckResult.reason.type === 'missing_thread') { dispatchWithMessageSource( { type: queueDMOpsActionType, payload: { operation: dmOp, threadID: processingCheckResult.reason.threadID, timestamp: Date.now(), }, }, metadata, ); } return; } const { rawMessageInfos, updateInfos } = await dmOpSpecs[ dmOp.type ].processDMOperation(dmOp, viewerID, utilities); const { rawMessageInfos: allNewMessageInfos } = mergeUpdatesWithMessageInfos(rawMessageInfos, updateInfos); const messagesByThreadID = _groupBy(message => message.threadID)( allNewMessageInfos, ); const updatedThreadInfosByThreadID: { [string]: RawThreadInfo | LegacyRawThreadInfo, } = {}; for (const threadID in messagesByThreadID) { updatedThreadInfosByThreadID[threadID] = threadInfos[threadID]; } for (const update of updateInfos) { const updatedThreadInfo = updateSpecs[ update.type ].getUpdatedThreadInfo?.(update, updatedThreadInfosByThreadID); if ( updatedThreadInfo && updatedThreadInfo?.type === threadTypes.THICK_SIDEBAR ) { updatedThreadInfosByThreadID[updatedThreadInfo.id] = updatedThreadInfo; } } for (const threadID in messagesByThreadID) { const repliesCountIncreasingMessages = messagesByThreadID[ threadID ].filter(message => messageSpecs[message.type].includedInRepliesCount); if (repliesCountIncreasingMessages.length > 0) { const threadInfo = updatedThreadInfosByThreadID[threadID]; const repliesCountIncreaseTime = Math.max( repliesCountIncreasingMessages.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD, id: uuid.v4(), time: repliesCountIncreaseTime, threadInfo: { ...threadInfo, repliesCount: threadInfo.repliesCount + repliesCountIncreasingMessages.length, }, }); } const messagesFromOtherPeers = messagesByThreadID[threadID].filter( message => message.creatorID !== viewerID, ); if (messagesFromOtherPeers.length === 0) { continue; } // We take the most recent timestamp to make sure that updates older // than it won't flip the status to read. const time = Math.max( messagesFromOtherPeers.map(message => message.time), ); updateInfos.push({ type: updateTypes.UPDATE_THREAD_READ_STATUS, id: uuid.v4(), time, threadID, unread: true, }); } dispatchWithMessageSource( { type: processDMOpsActionType, payload: { rawMessageInfos, updateInfos, }, }, metadata, ); }, [viewerID, utilities, dispatchWithMessageSource, threadInfos], ); } function useProcessAndSendDMOperation(): ( dmOperationSpecification: OutboundDMOperationSpecification, ) => Promise { const processDMOps = useProcessDMOperation(); const { sendDMOperation } = usePeerToPeerCommunication(); return React.useCallback( async (dmOperationSpecification: OutboundDMOperationSpecification) => { - await processDMOps(dmOperationSpecification.op); + await processDMOps(dmOperationSpecification); await sendDMOperation(dmOperationSpecification); }, [processDMOps, sendDMOperation], ); } export { useProcessDMOperation, useProcessAndSendDMOperation }; diff --git a/lib/tunnelbroker/use-peer-to-peer-message-handler.js b/lib/tunnelbroker/use-peer-to-peer-message-handler.js index f7e036372..38e17ff36 100644 --- a/lib/tunnelbroker/use-peer-to-peer-message-handler.js +++ b/lib/tunnelbroker/use-peer-to-peer-message-handler.js @@ -1,419 +1,424 @@ // @flow import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; import * as React from 'react'; import { useResendPeerToPeerMessages } from './use-resend-peer-to-peer-messages.js'; import { removePeerUsersActionType } from '../actions/aux-user-actions.js'; import { invalidateTunnelbrokerDeviceTokenActionType } from '../actions/tunnelbroker-actions.js'; import { logOutActionTypes, useLogOut } from '../actions/user-actions.js'; import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js'; import { useBroadcastDeviceListUpdates, useBroadcastAccountDeletion, useGetAndUpdateDeviceListsForUsers, } from '../hooks/peer-list-hooks.js'; import { getAllPeerDevices, getForeignPeerDeviceIDs, } from '../selectors/user-selectors.js'; import { verifyAndGetDeviceList, removeDeviceFromDeviceList, } from '../shared/device-list-utils.js'; +import { dmOperationSpecificationTypes } from '../shared/dm-ops/dm-op-utils.js'; import { useProcessDMOperation } from '../shared/dm-ops/process-dm-ops.js'; import { IdentityClientContext } from '../shared/identity-client-context.js'; import type { DeviceOlmInboundKeys } from '../types/identity-service-types.js'; import { peerToPeerMessageTypes, type PeerToPeerMessage, type SenderInfo, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { userActionsP2PMessageTypes, userActionP2PMessageValidator, type UserActionP2PMessage, } from '../types/tunnelbroker/user-actions-peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { hasHigherDeviceID, OLM_SESSION_ERROR_PREFIX, olmSessionErrors, } from '../utils/olm-utils.js'; import { getClientMessageIDFromTunnelbrokerMessageID } from '../utils/peer-to-peer-communication-utils.js'; import { useDispatchActionPromise } from '../utils/redux-promise-utils.js'; import { useDispatch, useSelector } from '../utils/redux-utils.js'; // When logout is requested by primary device, logging out of Identity Service // is already handled by the primary device const primaryRequestLogoutOptions = Object.freeze({ skipIdentityLogOut: true }); // When re-broadcasting, we want to do it only to foreign peers // to avoid a vicious circle of deletion messages sent by own devices. const accountDeletionBroadcastOptions = Object.freeze({ includeOwnDevices: false, }); // handles `peerToPeerMessageTypes.ENCRYPTED_MESSAGE` function useHandleOlmMessageToDevice(): ( decryptedMessageContent: string, senderInfo: SenderInfo, messageID: string, ) => Promise { const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const { identityClient, getAuthMetadata } = identityContext; const broadcastDeviceListUpdates = useBroadcastDeviceListUpdates(); const reBroadcastAccountDeletion = useBroadcastAccountDeletion( accountDeletionBroadcastOptions, ); const allPeerDevices = useSelector(getAllPeerDevices); const dispatch = useDispatch(); const dispatchActionPromise = useDispatchActionPromise(); const primaryDeviceRequestedLogOut = useLogOut(primaryRequestLogoutOptions); const processDMOperation = useProcessDMOperation(); return React.useCallback( async ( decryptedMessageContent: string, senderInfo: SenderInfo, messageID: string, ) => { const { sqliteAPI } = getConfig(); const parsedMessageToDevice = JSON.parse(decryptedMessageContent); // Handle user-action messages if (!userActionP2PMessageValidator.is(parsedMessageToDevice)) { return; } const userActionMessage: UserActionP2PMessage = parsedMessageToDevice; if ( userActionMessage.type === userActionsP2PMessageTypes.LOG_OUT_DEVICE ) { // causes log out, there is no need to remove Inbound P2P message void dispatchActionPromise( logOutActionTypes, primaryDeviceRequestedLogOut(), ); } else if ( userActionMessage.type === userActionsP2PMessageTypes.LOG_OUT_SECONDARY_DEVICE ) { const { userID, deviceID: deviceIDToLogOut } = senderInfo; await removeDeviceFromDeviceList( identityClient, userID, deviceIDToLogOut, ); await broadcastDeviceListUpdates( allPeerDevices.filter(deviceID => deviceID !== deviceIDToLogOut), ); await sqliteAPI.removeInboundP2PMessages([messageID]); } else if ( userActionMessage.type === userActionsP2PMessageTypes.DM_OPERATION ) { // inbound P2P message is removed in DBOpsHandler after processing - await processDMOperation(userActionMessage.op, { - messageID, - senderDeviceID: senderInfo.deviceID, + await processDMOperation({ + type: dmOperationSpecificationTypes.INBOUND, + op: userActionMessage.op, + metadata: { + messageID, + senderDeviceID: senderInfo.deviceID, + }, }); } else if ( userActionMessage.type === userActionsP2PMessageTypes.ACCOUNT_DELETION ) { const { userID: thisUserID } = await getAuthMetadata(); if (!thisUserID) { return; } // own devices re-broadcast account deletion to foreign peer devices if (senderInfo.userID === thisUserID) { await reBroadcastAccountDeletion(); // we treat account deletion the same way as primary-device-requested // logout, no need to remove Inbound P2P message void dispatchActionPromise( logOutActionTypes, primaryDeviceRequestedLogOut(), ); } else { dispatch({ type: removePeerUsersActionType, payload: { userIDs: [senderInfo.userID] }, }); await sqliteAPI.removeInboundP2PMessages([messageID]); } } else { console.warn( 'Unsupported P2P user action message:', userActionMessage.type, ); } }, [ allPeerDevices, broadcastDeviceListUpdates, dispatch, dispatchActionPromise, getAuthMetadata, identityClient, primaryDeviceRequestedLogOut, processDMOperation, reBroadcastAccountDeletion, ], ); } function usePeerToPeerMessageHandler(): ( message: PeerToPeerMessage, messageID: string, ) => Promise { const { olmAPI, sqliteAPI } = getConfig(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const { identityClient, getAuthMetadata } = identityContext; const foreignPeerDevices = useSelector(getForeignPeerDeviceIDs); const broadcastDeviceListUpdates = useBroadcastDeviceListUpdates(); const getAndUpdateDeviceListsForUsers = useGetAndUpdateDeviceListsForUsers(); const dispatch = useDispatch(); const handleOlmMessageToDevice = useHandleOlmMessageToDevice(); const resendPeerToPeerMessages = useResendPeerToPeerMessages(); const { createOlmSessionsWithPeer } = usePeerOlmSessionsCreatorContext(); return React.useCallback( async (message: PeerToPeerMessage, messageID: string) => { if (message.type === peerToPeerMessageTypes.OUTBOUND_SESSION_CREATION) { const { senderInfo, encryptedData, sessionVersion } = message; const { userID: senderUserID, deviceID: senderDeviceID } = senderInfo; let deviceKeys: ?DeviceOlmInboundKeys = null; try { const { keys } = await identityClient.getInboundKeysForUser(senderUserID); deviceKeys = keys[senderDeviceID]; } catch (e) { console.log(e.message); } if (!deviceKeys) { console.log( 'Error creating inbound session with device ' + `${senderDeviceID}: No keys for the device, ` + `session version: ${sessionVersion}`, ); return; } try { await olmAPI.initializeCryptoAccount(); const result = await olmAPI.contentInboundSessionCreator( deviceKeys.identityKeysBlob.primaryIdentityPublicKeys, encryptedData, sessionVersion, false, ); await resendPeerToPeerMessages(senderDeviceID); console.log( 'Created inbound session with device ' + `${senderDeviceID}: ${result}, ` + `session version: ${sessionVersion}`, ); } catch (e) { if (e.message?.includes(olmSessionErrors.alreadyCreated)) { console.log( 'Received session request with lower session version from ' + `${senderDeviceID}, session version: ${sessionVersion}`, ); } else if (e.message?.includes(olmSessionErrors.raceCondition)) { const currentDeviceID = await getContentSigningKey(); if (hasHigherDeviceID(currentDeviceID, senderDeviceID)) { console.log( 'Race condition while creating session with ' + `${senderDeviceID}, session version: ${sessionVersion}, ` + `this device has a higher deviceID and the session will be kept`, ); } else { const result = await olmAPI.contentInboundSessionCreator( deviceKeys.identityKeysBlob.primaryIdentityPublicKeys, encryptedData, sessionVersion, true, ); console.log( 'Overwrite session with device ' + `${senderDeviceID}: ${result}, ` + `session version: ${sessionVersion}`, ); await resendPeerToPeerMessages(senderDeviceID); } } else { console.log( 'Error creating inbound session with device ' + `${senderDeviceID}: ${e.message}, ` + `session version: ${sessionVersion}`, ); } } } else if (message.type === peerToPeerMessageTypes.ENCRYPTED_MESSAGE) { try { await olmAPI.initializeCryptoAccount(); const decrypted = await olmAPI.decryptAndPersist( message.encryptedData, message.senderInfo.deviceID, message.senderInfo.userID, messageID, ); console.log( 'Decrypted message from device ' + `${message.senderInfo.deviceID}: ${decrypted}`, ); try { await handleOlmMessageToDevice( decrypted, message.senderInfo, messageID, ); } catch (e) { console.log('Failed processing Olm P2P message:', e); } } catch (e) { if (e.message?.includes(olmSessionErrors.invalidSessionVersion)) { console.log( 'Received message decrypted with different session from ' + `${message.senderInfo.deviceID}.`, ); return; } console.log( 'Error decrypting message from device ' + `${message.senderInfo.deviceID}: ${e.message}`, ); if (!e.message?.includes(OLM_SESSION_ERROR_PREFIX)) { throw e; } await createOlmSessionsWithPeer( message.senderInfo.userID, message.senderInfo.deviceID, { overwriteContentSession: true, }, ); await resendPeerToPeerMessages(message.senderInfo.deviceID); } } else if (message.type === peerToPeerMessageTypes.REFRESH_KEY_REQUEST) { try { await olmAPI.initializeCryptoAccount(); const oneTimeKeys = await olmAPI.getOneTimeKeys(message.numberOfKeys); await identityClient.uploadOneTimeKeys(oneTimeKeys); } catch (e) { console.log(`Error uploading one-time keys: ${e.message}`); } } else if (message.type === peerToPeerMessageTypes.DEVICE_LIST_UPDATED) { try { const result = await verifyAndGetDeviceList( identityClient, message.userID, null, ); if (!result.valid) { console.log( `Received invalid device list update for user ${message.userID}. Reason: ${result.reason}`, ); } else { console.log( `Received valid device list update for user ${message.userID}`, ); } await getAndUpdateDeviceListsForUsers([message.userID]); if (result.valid && message?.signedDeviceList?.rawDeviceList) { const receivedRawList = JSON.parse( message.signedDeviceList.rawDeviceList, ); // additional check for broadcasted and Identity device // list equality const listsAreEqual = _isEqual(result.deviceList)(receivedRawList); console.log( `Identity and received device lists are ${ listsAreEqual ? '' : 'not' } equal.`, ); } } catch (e) { console.log( `Error verifying device list for user ${message.userID}: ${e}`, ); } } else if ( message.type === peerToPeerMessageTypes.IDENTITY_DEVICE_LIST_UPDATED ) { try { const { userID } = await getAuthMetadata(); if (!userID) { return; } await Promise.all([ broadcastDeviceListUpdates(foreignPeerDevices), getAndUpdateDeviceListsForUsers([userID]), ]); } catch (e) { console.log( `Error updating device list after Identity request: ${ getMessageForException(e) ?? 'unknown error' }`, ); } } else if (message.type === peerToPeerMessageTypes.MESSAGE_PROCESSED) { try { const { deviceID, messageID: tunnelbrokerMessageID } = message; const clientMessageID = getClientMessageIDFromTunnelbrokerMessageID( tunnelbrokerMessageID, ); await sqliteAPI.removeOutboundP2PMessage(clientMessageID, deviceID); } catch (e) { console.log( `Error removing message after processing: ${ getMessageForException(e) ?? 'unknown error' }`, ); } } else if (message.type === peerToPeerMessageTypes.BAD_DEVICE_TOKEN) { dispatch({ type: invalidateTunnelbrokerDeviceTokenActionType, payload: { deviceToken: message.invalidatedToken, }, }); } }, [ broadcastDeviceListUpdates, createOlmSessionsWithPeer, dispatch, foreignPeerDevices, getAndUpdateDeviceListsForUsers, getAuthMetadata, handleOlmMessageToDevice, identityClient, olmAPI, resendPeerToPeerMessages, sqliteAPI, ], ); } export { usePeerToPeerMessageHandler, useHandleOlmMessageToDevice };