diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -3,6 +3,7 @@ import * as React from 'react'; import { opsProcessingFinishedActionType } from '../actions/db-ops-actions.js'; +import { usePeerToPeerCommunication } from '../tunnelbroker/peer-to-peer-context.js'; import { useTunnelbroker } from '../tunnelbroker/tunnelbroker-context.js'; import type { DBOpsEntry } from '../types/db-ops-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; @@ -25,6 +26,7 @@ const queueFront = useSelector(state => state.dbOpsStore.queuedOps[0]); const prevQueueFront = React.useRef(null); const { sendMessage } = useTunnelbroker(); + const { processOutboundMessages } = usePeerToPeerCommunication(); const dispatch = useDispatch(); @@ -38,6 +40,9 @@ void (async () => { if (ops) { await processDBStoreOperations(ops); + if (ops.outboundP2PMessages && ops.outboundP2PMessages.length > 0) { + processOutboundMessages(); + } } dispatch({ type: opsProcessingFinishedActionType, @@ -65,7 +70,14 @@ } } })(); - }, [queueFront, dispatch, processDBStoreOperations, sendMessage, sqliteAPI]); + }, [ + queueFront, + dispatch, + processDBStoreOperations, + sendMessage, + sqliteAPI, + processOutboundMessages, + ]); return null; } diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js new file mode 100644 --- /dev/null +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -0,0 +1,190 @@ +// @flow + +import invariant from 'invariant'; +import * as React from 'react'; + +import { + type TunnelbrokerClientMessageToDevice, + useTunnelbroker, +} from './tunnelbroker-context.js'; +import { + IdentityClientContext, + type IdentityClientContextType, +} from '../shared/identity-client-context.js'; +import { + type OutboundP2PMessage, + outboundP2PMessageStatuses, +} from '../types/sqlite-types.js'; +import { + type EncryptedMessage, + peerToPeerMessageTypes, +} from '../types/tunnelbroker/peer-to-peer-message-types.js'; +import { getConfig } from '../utils/config.js'; +import { createOlmSessionWithPeer } from '../utils/crypto-utils.js'; +import { getMessageForException } from '../utils/errors.js'; + +type PeerToPeerContextType = { + +processOutboundMessages: () => void, +}; + +const PeerToPeerContext: React.Context = + React.createContext(); + +type Props = { + +children: React.Node, +}; + +async function processOutboundP2PMessages( + sendMessage: ( + message: TunnelbrokerClientMessageToDevice, + messageID: ?string, + ) => Promise, + identityContext: IdentityClientContextType, +): Promise { + const authMetadata = await identityContext.getAuthMetadata(); + if (!authMetadata.deviceID || !authMetadata.userID || !authMetadata.userID) { + return; + } + + const { olmAPI, sqliteAPI } = getConfig(); + await olmAPI.initializeCryptoAccount(); + const messages = await sqliteAPI.getAllOutboundP2PMessage(); + + const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {}; + for (const message: OutboundP2PMessage of messages) { + if (!devicesMap[message.deviceID]) { + devicesMap[message.deviceID] = [message]; + } else { + devicesMap[message.deviceID].push(message); + } + } + + const sendMessageToPeer = async ( + message: OutboundP2PMessage, + ): Promise => { + if (!authMetadata.deviceID || !authMetadata.userID) { + return; + } + const encryptedMessage: EncryptedMessage = { + type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE, + senderInfo: { + deviceID: authMetadata.deviceID, + userID: authMetadata.userID, + }, + encryptedData: JSON.parse(message.ciphertext), + }; + await sendMessage( + { + deviceID: message.deviceID, + payload: JSON.stringify(encryptedMessage), + }, + message.messageID, + ); + await sqliteAPI.markOutboundP2PMessageAsSent( + message.messageID, + message.deviceID, + ); + }; + + for (const peerDeviceID in devicesMap) { + for (const message of devicesMap[peerDeviceID]) { + if (message.status === outboundP2PMessageStatuses.persisted) { + try { + const result = await olmAPI.encryptAndPersist( + message.plaintext, + message.deviceID, + message.messageID, + ); + + const encryptedMessage: OutboundP2PMessage = { + ...message, + ciphertext: JSON.stringify(result), + }; + await sendMessageToPeer(encryptedMessage); + } catch (e) { + try { + await createOlmSessionWithPeer( + authMetadata, + identityContext.identityClient, + sendMessage, + message.userID, + peerDeviceID, + ); + const result = await olmAPI.encryptAndPersist( + message.plaintext, + message.deviceID, + message.messageID, + ); + const encryptedMessage: OutboundP2PMessage = { + ...message, + ciphertext: JSON.stringify(result), + }; + + await sendMessageToPeer(encryptedMessage); + } catch (err) { + console.log(`Error sending messages to peer ${peerDeviceID}`, err); + break; + } + } + } else if (message.status === outboundP2PMessageStatuses.encrypted) { + await sendMessageToPeer(message); + } + } + } +} + +function PeerToPeerProvider(props: Props): React.Node { + const { children } = props; + + const restartPromise = React.useRef(false); + const promiseRunning = React.useRef(false); + + const { sendMessage } = useTunnelbroker(); + const identityContext = React.useContext(IdentityClientContext); + invariant(identityContext, 'Identity context should be set'); + + const processOutboundMessages = React.useCallback(() => { + if (!promiseRunning.current) { + promiseRunning.current = true; + void (async () => { + do { + restartPromise.current = false; + try { + await processOutboundP2PMessages(sendMessage, identityContext); + } catch (e) { + console.log( + `Error processing outbound P2P messages: ${ + getMessageForException(e) ?? 'unknown' + }`, + ); + } + } while (restartPromise.current); + promiseRunning.current = false; + })(); + } else { + restartPromise.current = true; + } + }, [identityContext, sendMessage]); + + const value: PeerToPeerContextType = React.useMemo( + () => ({ + processOutboundMessages, + }), + [processOutboundMessages], + ); + + return ( + + {children} + + ); +} + +function usePeerToPeerCommunication(): PeerToPeerContextType { + const context = React.useContext(PeerToPeerContext); + invariant(context, 'PeerToPeerContext not found'); + + return context; +} + +export { PeerToPeerProvider, usePeerToPeerCommunication }; diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js --- a/lib/tunnelbroker/tunnelbroker-context.js +++ b/lib/tunnelbroker/tunnelbroker-context.js @@ -5,6 +5,7 @@ import * as React from 'react'; import uuid from 'uuid'; +import { PeerToPeerProvider } from './peer-to-peer-context.js'; import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; @@ -472,7 +473,7 @@ return ( - {children} + {children} ); } diff --git a/lib/types/sqlite-types.js b/lib/types/sqlite-types.js --- a/lib/types/sqlite-types.js +++ b/lib/types/sqlite-types.js @@ -2,6 +2,23 @@ import type { StoreOperations } from './store-ops-types.js'; +export const outboundP2PMessageStatuses = Object.freeze({ + // The message was prepared to be sent to other peers, but it's not encrypted. + // It was inserted into DB in the same transaction as making changes to + // the store. + persisted: 'persisted', + // Encryption is done in the same transaction as persisting the CryptoModule, + // and message order is also tracked on the client side, + // which means the message can be sent. + encrypted: 'encrypted', + // The message was sent to another peer (Tunnelbroker owns it), + // waiting for the confirmation (handled in `peerToPeerMessageHandler`). + sent: 'sent', +}); +export type OutboundP2PMessageStatuses = $Values< + typeof outboundP2PMessageStatuses, +>; + export type InboundP2PMessage = { +messageID: string, +senderDeviceID: string, @@ -16,7 +33,7 @@ +timestamp: string, +plaintext: string, +ciphertext: string, - +status: string, + +status: OutboundP2PMessageStatuses, }; export type SQLiteAPI = {