diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js index 59b0238c9..6eba0272f 100644 --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -1,73 +1,85 @@ // @flow import * as React from 'react'; import { opsProcessingFinishedActionType } from '../actions/db-ops-actions.js'; +import { usePeerToPeerCommunication } from '../tunnelbroker/peer-to-peer-context.js'; import { useTunnelbroker } from '../tunnelbroker/tunnelbroker-context.js'; import type { DBOpsEntry } from '../types/db-ops-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; import { type MessageProcessed, peerToPeerMessageTypes, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { useDispatch, useSelector } from '../utils/redux-utils.js'; type Props = { +processDBStoreOperations: StoreOperations => Promise, }; function DBOpsHandler(props: Props): React.Node { const { sqliteAPI } = getConfig(); const { processDBStoreOperations } = props; const queueFront = useSelector(state => state.dbOpsStore.queuedOps[0]); const prevQueueFront = React.useRef(null); const { sendMessage } = useTunnelbroker(); + const { processOutboundMessages } = usePeerToPeerCommunication(); const dispatch = useDispatch(); React.useEffect(() => { if (!queueFront || prevQueueFront.current === queueFront) { return; } prevQueueFront.current = queueFront; const { ops, messageSourceMetadata } = queueFront; void (async () => { if (ops) { await processDBStoreOperations(ops); + if (ops.outboundP2PMessages && ops.outboundP2PMessages.length > 0) { + processOutboundMessages(); + } } dispatch({ type: opsProcessingFinishedActionType, }); if (messageSourceMetadata) { try { const { messageID, senderDeviceID } = messageSourceMetadata; const deviceID = await getContentSigningKey(); const message: MessageProcessed = { type: peerToPeerMessageTypes.MESSAGE_PROCESSED, messageID, deviceID, }; await sendMessage({ deviceID: senderDeviceID, payload: JSON.stringify(message), }); await sqliteAPI.removeInboundP2PMessages([messageID]); } catch (e) { console.log( `Error while sending confirmation: ${ getMessageForException(e) ?? 'unknown error' }`, ); } } })(); - }, [queueFront, dispatch, processDBStoreOperations, sendMessage, sqliteAPI]); + }, [ + queueFront, + dispatch, + processDBStoreOperations, + sendMessage, + sqliteAPI, + processOutboundMessages, + ]); return null; } export { DBOpsHandler }; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js new file mode 100644 index 000000000..504eda8b6 --- /dev/null +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -0,0 +1,190 @@ +// @flow + +import invariant from 'invariant'; +import * as React from 'react'; + +import { + type TunnelbrokerClientMessageToDevice, + useTunnelbroker, +} from './tunnelbroker-context.js'; +import { + IdentityClientContext, + type IdentityClientContextType, +} from '../shared/identity-client-context.js'; +import { + type OutboundP2PMessage, + outboundP2PMessageStatuses, +} from '../types/sqlite-types.js'; +import { + type EncryptedMessage, + peerToPeerMessageTypes, +} from '../types/tunnelbroker/peer-to-peer-message-types.js'; +import { getConfig } from '../utils/config.js'; +import { createOlmSessionWithPeer } from '../utils/crypto-utils.js'; +import { getMessageForException } from '../utils/errors.js'; + +type PeerToPeerContextType = { + +processOutboundMessages: () => void, +}; + +const PeerToPeerContext: React.Context = + React.createContext(); + +type Props = { + +children: React.Node, +}; + +async function processOutboundP2PMessages( + sendMessage: ( + message: TunnelbrokerClientMessageToDevice, + messageID: ?string, + ) => Promise, + identityContext: IdentityClientContextType, +): Promise { + const authMetadata = await identityContext.getAuthMetadata(); + if (!authMetadata.deviceID || !authMetadata.userID || !authMetadata.userID) { + return; + } + + const { olmAPI, sqliteAPI } = getConfig(); + await olmAPI.initializeCryptoAccount(); + const messages = await sqliteAPI.getAllOutboundP2PMessage(); + + const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; + for (const message: OutboundP2PMessage of messages) { + if (!devicesMap[message.deviceID]) { + devicesMap[message.deviceID] = [message]; + } else { + devicesMap[message.deviceID].push(message); + } + } + + const sendMessageToPeer = async ( + message: OutboundP2PMessage, + ): Promise => { + if (!authMetadata.deviceID || !authMetadata.userID) { + return; + } + const encryptedMessage: EncryptedMessage = { + type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, + senderInfo: { + deviceID: authMetadata.deviceID, + userID: authMetadata.userID, + }, + encryptedData: JSON.parse(message.ciphertext), + }; + await sendMessage( + { + deviceID: message.deviceID, + payload: JSON.stringify(encryptedMessage), + }, + message.messageID, + ); + await sqliteAPI.markOutboundP2PMessageAsSent( + message.messageID, + message.deviceID, + ); + }; + + for (const peerDeviceID in devicesMap) { + for (const message of devicesMap[peerDeviceID]) { + if (message.status === outboundP2PMessageStatuses.persisted) { + try { + const result = await olmAPI.encryptAndPersist( + message.plaintext, + message.deviceID, + message.messageID, + ); + + const encryptedMessage: OutboundP2PMessage = { + ...message, + ciphertext: JSON.stringify(result), + }; + await sendMessageToPeer(encryptedMessage); + } catch (e) { + try { + await createOlmSessionWithPeer( + authMetadata, + identityContext.identityClient, + sendMessage, + message.userID, + peerDeviceID, + ); + const result = await olmAPI.encryptAndPersist( + message.plaintext, + message.deviceID, + message.messageID, + ); + const encryptedMessage: OutboundP2PMessage = { + ...message, + ciphertext: JSON.stringify(result), + }; + + await sendMessageToPeer(encryptedMessage); + } catch (err) { + console.log(`Error sending messages to peer ${peerDeviceID}`, err); + break; + } + } + } else if (message.status === outboundP2PMessageStatuses.encrypted) { + await sendMessageToPeer(message); + } + } + } +} + +function PeerToPeerProvider(props: Props): React.Node { + const { children } = props; + + const restartPromise = React.useRef(false); + const promiseRunning = React.useRef(false); + + const { sendMessage } = useTunnelbroker(); + const identityContext = React.useContext(IdentityClientContext); + invariant(identityContext, 'Identity context should be set'); + + const processOutboundMessages = React.useCallback(() => { + if (!promiseRunning.current) { + promiseRunning.current = true; + void (async () => { + do { + restartPromise.current = false; + try { + await processOutboundP2PMessages(sendMessage, identityContext); + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + } + } while (restartPromise.current); + promiseRunning.current = false; + })(); + } else { + restartPromise.current = true; + } + }, [identityContext, sendMessage]); + + const value: PeerToPeerContextType = React.useMemo( + () => ({ + processOutboundMessages, + }), + [processOutboundMessages], + ); + + return ( + + {children} + + ); +} + +function usePeerToPeerCommunication(): PeerToPeerContextType { + const context = React.useContext(PeerToPeerContext); + invariant(context, 'PeerToPeerContext not found'); + + return context; +} + +export { PeerToPeerProvider, usePeerToPeerCommunication }; diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js index 08d597699..8a0cb4b63 100644 --- a/lib/tunnelbroker/tunnelbroker-context.js +++ b/lib/tunnelbroker/tunnelbroker-context.js @@ -1,487 +1,488 @@ // @flow import invariant from 'invariant'; import _isEqual from 'lodash/fp/isEqual.js'; import * as React from 'react'; import uuid from 'uuid'; +import { PeerToPeerProvider } from './peer-to-peer-context.js'; import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; import { IdentityClientContext } from '../shared/identity-client-context.js'; import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; import { isWebPlatform } from '../types/device-types.js'; import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js'; import type { MessageSentStatus } from '../types/tunnelbroker/message-to-device-request-status-types.js'; import type { MessageToDeviceRequest } from '../types/tunnelbroker/message-to-device-request-types.js'; import { type TunnelbrokerMessage, tunnelbrokerMessageTypes, tunnelbrokerMessageValidator, } from '../types/tunnelbroker/messages.js'; import { type PeerToPeerMessage, peerToPeerMessageValidator, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; import type { AnonymousInitializationMessage, ConnectionInitializationMessage, TunnelbrokerInitializationMessage, TunnelbrokerDeviceTypes, } from '../types/tunnelbroker/session-types.js'; import type { Heartbeat } from '../types/websocket/heartbeat-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { useSelector } from '../utils/redux-utils.js'; export type TunnelbrokerClientMessageToDevice = { +deviceID: string, +payload: string, }; export type TunnelbrokerSocketListener = ( message: TunnelbrokerMessage, ) => mixed; type PromiseCallbacks = { +resolve: () => void, +reject: (error: string) => void, }; type Promises = { [clientMessageID: string]: PromiseCallbacks }; type TunnelbrokerContextType = { +sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise, +addListener: (listener: TunnelbrokerSocketListener) => void, +removeListener: (listener: TunnelbrokerSocketListener) => void, +connected: boolean, +isAuthorized: boolean, +setUnauthorizedDeviceID: (unauthorizedDeviceID: ?string) => void, }; const TunnelbrokerContext: React.Context = React.createContext(); type Props = { +children: React.Node, +shouldBeClosed?: boolean, +onClose?: () => mixed, +secondaryTunnelbrokerConnection?: SecondaryTunnelbrokerConnection, }; function getTunnelbrokerDeviceType(): TunnelbrokerDeviceTypes { return isWebPlatform(getConfig().platformDetails.platform) ? 'web' : 'mobile'; } function createAnonymousInitMessage( deviceID: string, ): AnonymousInitializationMessage { return ({ type: 'AnonymousInitializationMessage', deviceID, deviceType: getTunnelbrokerDeviceType(), }: AnonymousInitializationMessage); } function TunnelbrokerProvider(props: Props): React.Node { const { children, shouldBeClosed, onClose, secondaryTunnelbrokerConnection } = props; const accessToken = useSelector(state => state.commServicesAccessToken); const userID = useSelector(state => state.currentUserInfo?.id); const [unauthorizedDeviceID, setUnauthorizedDeviceID] = React.useState(null); const isAuthorized = !unauthorizedDeviceID; const createInitMessage = React.useCallback(async () => { if (shouldBeClosed) { return null; } if (unauthorizedDeviceID) { return createAnonymousInitMessage(unauthorizedDeviceID); } if (!accessToken || !userID) { return null; } const deviceID = await getContentSigningKey(); if (!deviceID) { return null; } return ({ type: 'ConnectionInitializationMessage', deviceID, accessToken, userID, deviceType: getTunnelbrokerDeviceType(), }: ConnectionInitializationMessage); }, [accessToken, shouldBeClosed, unauthorizedDeviceID, userID]); const previousInitMessage = React.useRef(null); const [connected, setConnected] = React.useState(false); const listeners = React.useRef>(new Set()); const socket = React.useRef(null); const currentlyProcessedMessage = React.useRef>(null); const socketSessionCounter = React.useRef(0); const promises = React.useRef({}); const heartbeatTimeoutID = React.useRef(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); const { identityClient } = identityContext; const stopHeartbeatTimeout = React.useCallback(() => { if (heartbeatTimeoutID.current) { clearTimeout(heartbeatTimeoutID.current); heartbeatTimeoutID.current = null; } }, []); const resetHeartbeatTimeout = React.useCallback(() => { stopHeartbeatTimeout(); heartbeatTimeoutID.current = setTimeout(() => { socket.current?.close(); setConnected(false); }, tunnelbrokerHeartbeatTimeout); }, [stopHeartbeatTimeout]); // determine if the socket is active (not closed or closing) const isSocketActive = socket.current?.readyState === WebSocket.OPEN || socket.current?.readyState === WebSocket.CONNECTING; const connectionChangePromise = React.useRef>(null); // The Tunnelbroker connection can have 4 states: // - DISCONNECTED: isSocketActive = false, connected = false // Should be in this state when initMessage is null // - CONNECTING: isSocketActive = true, connected = false // This lasts until Tunnelbroker sends ConnectionInitializationResponse // - CONNECTED: isSocketActive = true, connected = true // - DISCONNECTING: isSocketActive = false, connected = true // This lasts between socket.close() and socket.onclose() React.useEffect(() => { connectionChangePromise.current = (async () => { await connectionChangePromise.current; try { const initMessage = await createInitMessage(); const initMessageChanged = !_isEqual( previousInitMessage.current, initMessage, ); previousInitMessage.current = initMessage; // when initMessage changes, we need to close the socket // and open a new one if ( (!initMessage || initMessageChanged) && isSocketActive && socket.current ) { socket.current?.close(); return; } // when we're already connected (or pending disconnection), // or there's no init message to start with, we don't need // to do anything if (connected || !initMessage || socket.current) { return; } const tunnelbrokerSocket = new WebSocket(tunnnelbrokerURL); tunnelbrokerSocket.onopen = () => { tunnelbrokerSocket.send(JSON.stringify(initMessage)); }; tunnelbrokerSocket.onclose = () => { // this triggers the effect hook again and reconnect setConnected(false); onClose?.(); socket.current = null; console.log('Connection to Tunnelbroker closed'); }; tunnelbrokerSocket.onerror = e => { console.log('Tunnelbroker socket error:', e.message); }; tunnelbrokerSocket.onmessage = (event: MessageEvent) => { if (typeof event.data !== 'string') { console.log('socket received a non-string message'); return; } let rawMessage; try { rawMessage = JSON.parse(event.data); } catch (e) { console.log('error while parsing Tunnelbroker message:', e.message); return; } if (!tunnelbrokerMessageValidator.is(rawMessage)) { console.log('invalid TunnelbrokerMessage'); return; } const message: TunnelbrokerMessage = rawMessage; resetHeartbeatTimeout(); for (const listener of listeners.current) { listener(message); } if ( message.type === tunnelbrokerMessageTypes.CONNECTION_INITIALIZATION_RESPONSE ) { if (message.status.type === 'Success' && !connected) { setConnected(true); console.log( 'session with Tunnelbroker created. isAuthorized:', isAuthorized, ); } else if (message.status.type === 'Success' && connected) { console.log( 'received ConnectionInitializationResponse with status: Success for already connected socket', ); } else { setConnected(false); console.log( 'creating session with Tunnelbroker error:', message.status.data, ); } } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE ) { const confirmation: MessageReceiveConfirmation = { type: tunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION, messageIDs: [message.messageID], }; let rawPeerToPeerMessage; try { rawPeerToPeerMessage = JSON.parse(message.payload); } catch (e) { console.log( 'error while parsing Tunnelbroker peer-to-peer message:', e.message, ); // Client received incorrect message, confirm to remove from // Tunnelbroker queue. socket.current?.send(JSON.stringify(confirmation)); return; } if (!peerToPeerMessageValidator.is(rawPeerToPeerMessage)) { console.log('invalid Tunnelbroker PeerToPeerMessage'); // The client received an invalid Tunnelbroker message, // and cannot process this type of request. socket.current?.send(JSON.stringify(confirmation)); return; } const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage; currentlyProcessedMessage.current = (async () => { const localSocketSessionCounter = socketSessionCounter.current; await currentlyProcessedMessage.current; // Since scheduling processing this message socket is closed // or was closed and reopened, we have to stop processing // because Tunnelbroker flushes the message again when opening // the socket, and we want to process this only once // to maintain order. if ( localSocketSessionCounter !== socketSessionCounter.current || !socket.current ) { return; } try { await peerToPeerMessageHandler( peerToPeerMessage, identityClient, message.messageID, ); } catch (e) { console.log(e.message); } finally { if ( localSocketSessionCounter === socketSessionCounter.current && socket.current ) { // We confirm regardless of success or error while processing. socket.current.send(JSON.stringify(confirmation)); } } })(); } else if ( message.type === tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST_STATUS ) { for (const status: MessageSentStatus of message.clientMessageIDs) { if (status.type === 'Success') { promises.current[status.data]?.resolve(); delete promises.current[status.data]; } else if (status.type === 'Error') { promises.current[status.data.id]?.reject(status.data.error); delete promises.current[status.data.id]; } else if (status.type === 'SerializationError') { console.log('SerializationError for message: ', status.data); } else if (status.type === 'InvalidRequest') { console.log('Tunnelbroker recorded InvalidRequest'); } } } else if (message.type === tunnelbrokerMessageTypes.HEARTBEAT) { const heartbeat: Heartbeat = { type: tunnelbrokerMessageTypes.HEARTBEAT, }; socket.current?.send(JSON.stringify(heartbeat)); } }; socket.current = tunnelbrokerSocket; socketSessionCounter.current = socketSessionCounter.current + 1; } catch (err) { console.log('Tunnelbroker connection error:', err); } })(); }, [ connected, isSocketActive, isAuthorized, resetHeartbeatTimeout, stopHeartbeatTimeout, identityClient, onClose, createInitMessage, ]); const sendMessageToDeviceRequest: ( request: MessageToDeviceRequest, ) => Promise = React.useCallback( request => { return new Promise((resolve, reject) => { const socketActive = connected && socket.current; if (!shouldBeClosed && !socketActive) { throw new Error('Tunnelbroker not connected'); } promises.current[request.clientMessageID] = { resolve, reject, }; if (socketActive) { socket.current?.send(JSON.stringify(request)); } else { secondaryTunnelbrokerConnection?.sendMessage(request); } }); }, [connected, secondaryTunnelbrokerConnection, shouldBeClosed], ); const sendMessage: ( message: TunnelbrokerClientMessageToDevice, messageID: ?string, ) => Promise = React.useCallback( (message: TunnelbrokerClientMessageToDevice, messageID: ?string) => { const clientMessageID = messageID ?? uuid.v4(); const messageToDevice: MessageToDeviceRequest = { type: tunnelbrokerMessageTypes.MESSAGE_TO_DEVICE_REQUEST, clientMessageID, deviceID: message.deviceID, payload: message.payload, }; return sendMessageToDeviceRequest(messageToDevice); }, [sendMessageToDeviceRequest], ); React.useEffect( () => secondaryTunnelbrokerConnection?.onSendMessage(message => { if (shouldBeClosed) { // We aren't supposed to be handling it return; } void (async () => { try { await sendMessageToDeviceRequest(message); secondaryTunnelbrokerConnection.setMessageStatus( message.clientMessageID, ); } catch (error) { secondaryTunnelbrokerConnection.setMessageStatus( message.clientMessageID, error, ); } })(); }), [ secondaryTunnelbrokerConnection, sendMessageToDeviceRequest, shouldBeClosed, ], ); React.useEffect( () => secondaryTunnelbrokerConnection?.onMessageStatus((messageID, error) => { if (error) { promises.current[messageID].reject(error); } else { promises.current[messageID].resolve(); } delete promises.current[messageID]; }), [secondaryTunnelbrokerConnection], ); const addListener = React.useCallback( (listener: TunnelbrokerSocketListener) => { listeners.current.add(listener); }, [], ); const removeListener = React.useCallback( (listener: TunnelbrokerSocketListener) => { listeners.current.delete(listener); }, [], ); const value: TunnelbrokerContextType = React.useMemo( () => ({ sendMessage, connected, isAuthorized, addListener, removeListener, setUnauthorizedDeviceID, }), [addListener, connected, removeListener, sendMessage, isAuthorized], ); return ( - {children} + {children} ); } function useTunnelbroker(): TunnelbrokerContextType { const context = React.useContext(TunnelbrokerContext); invariant(context, 'TunnelbrokerContext not found'); return context; } export { TunnelbrokerProvider, useTunnelbroker }; diff --git a/lib/types/sqlite-types.js b/lib/types/sqlite-types.js index d2f9a1adb..8d65b44f4 100644 --- a/lib/types/sqlite-types.js +++ b/lib/types/sqlite-types.js @@ -1,42 +1,59 @@ // @flow import type { StoreOperations } from './store-ops-types.js'; +export const outboundP2PMessageStatuses = Object.freeze({ + // The message was prepared to be sent to other peers, but it's not encrypted. + // It was inserted into DB in the same transaction as making changes to + // the store. + persisted: 'persisted', + // Encryption is done in the same transaction as persisting the CryptoModule, + // and message order is also tracked on the client side, + // which means the message can be sent. + encrypted: 'encrypted', + // The message was sent to another peer (Tunnelbroker owns it), + // waiting for the confirmation (handled in `peerToPeerMessageHandler`). + sent: 'sent', +}); +export type OutboundP2PMessageStatuses = $Values< + typeof outboundP2PMessageStatuses, +>; + export type InboundP2PMessage = { +messageID: string, +senderDeviceID: string, +plaintext: string, +status: string, }; export type OutboundP2PMessage = { +messageID: string, +deviceID: string, +userID: string, +timestamp: string, +plaintext: string, +ciphertext: string, - +status: string, + +status: OutboundP2PMessageStatuses, }; export type SQLiteAPI = { // read operations +getAllInboundP2PMessage: () => Promise, +getAllOutboundP2PMessage: () => Promise, // write operations +removeInboundP2PMessages: (ids: $ReadOnlyArray) => Promise, +markOutboundP2PMessageAsSent: ( messageID: string, deviceID: string, ) => Promise, +removeOutboundP2PMessagesOlderThan: ( messageID: string, deviceID: string, ) => Promise, +processDBStoreOperations: ( operations: StoreOperations, userID?: ?string, ) => Promise, };