diff --git a/lib/handlers/peer-to-peer-message-handler.js b/lib/handlers/peer-to-peer-message-handler.js --- a/lib/handlers/peer-to-peer-message-handler.js +++ b/lib/handlers/peer-to-peer-message-handler.js @@ -3,24 +3,35 @@ import _isEqual from 'lodash/fp/isEqual.js'; import { verifyAndGetDeviceList } from '../shared/device-list-utils.js'; +import type { MessageSourceMetadata } from '../types/db-ops-types.js'; import type { IdentityServiceClient, DeviceOlmInboundKeys, } from '../types/identity-service-types.js'; +import type { SuperAction } from '../types/redux-types.js'; import { peerToPeerMessageTypes, type PeerToPeerMessage, } from '../types/tunnelbroker/peer-to-peer-message-types.js'; +import { syncingP2PMessageValidator } from '../types/tunnelbroker/syncing-peer-to-peer-message-types.js'; +import type { SyncingP2PMessage } from '../types/tunnelbroker/syncing-peer-to-peer-message-types.js'; import { getConfig } from '../utils/config.js'; import { getContentSigningKey } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; import { hasHigherDeviceID, olmSessionErrors } from '../utils/olm-utils.js'; -import { getClientMessageIDFromTunnelbrokerMessageID } from '../utils/peer-to-peer-communication-utils.js'; +import { + getActionFromOutboundP2PMessage, + getClientMessageIDFromTunnelbrokerMessageID, +} from '../utils/peer-to-peer-communication-utils.js'; async function peerToPeerMessageHandler( message: PeerToPeerMessage, identityClient: IdentityServiceClient, messageID: string, + dispatch: ( + action: SuperAction, + messageSourceMetadata: MessageSourceMetadata, + ) => mixed, ): Promise { const { olmAPI, sqliteAPI } = getConfig(); if (message.type === peerToPeerMessageTypes.OUTBOUND_SESSION_CREATION) { @@ -106,6 +117,23 @@ 'Decrypted message from device ' + `${message.senderInfo.deviceID}: ${decrypted}`, ); + + try { + const parsedMessageToDevice = JSON.parse(decrypted); + if (!syncingP2PMessageValidator.is(parsedMessageToDevice)) { + return; + } + const syncingP2PMessage: SyncingP2PMessage = parsedMessageToDevice; + const action = getActionFromOutboundP2PMessage(syncingP2PMessage); + if (action) { + dispatch(action, { + messageID, + senderDeviceID: message.senderInfo.deviceID, + }); + } + } catch (e) { + console.log(e); + } } catch (e) { if (e.message?.includes(olmSessionErrors.messageAlreadyDecrypted)) { console.log( diff --git a/lib/reducers/master-reducer.js b/lib/reducers/master-reducer.js --- a/lib/reducers/master-reducer.js +++ b/lib/reducers/master-reducer.js @@ -42,8 +42,10 @@ fullStateSyncActionType, incrementalStateSyncActionType, } from '../types/socket-types.js'; +import type { OutboundP2PMessage } from '../types/sqlite-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; import { isDev } from '../utils/dev-utils.js'; +import { getOutboundP2PMessagesFromAction } from '../utils/peer-to-peer-communication-utils.js'; export default function baseReducer>( state: T, @@ -183,6 +185,11 @@ const { threadActivityStore, threadActivityStoreOperations } = reduceThreadActivity(state.threadActivityStore, action); + const currentUserInfo = reduceCurrentUserInfo(state.currentUserInfo, action); + + const outboundP2PMessages: $ReadOnlyArray = + getOutboundP2PMessagesFromAction(action, auxUserStore, currentUserInfo); + return { state: { ...state, @@ -190,7 +197,7 @@ draftStore, entryStore, loadingStatuses: reduceLoadingStatuses(state.loadingStatuses, action), - currentUserInfo: reduceCurrentUserInfo(state.currentUserInfo, action), + currentUserInfo, threadStore, userStore, messageStore, @@ -228,6 +235,7 @@ syncedMetadataStoreOperations, auxUserStoreOperations, threadActivityStoreOperations, + outboundP2PMessages, }, }; } diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js --- a/lib/tunnelbroker/tunnelbroker-context.js +++ b/lib/tunnelbroker/tunnelbroker-context.js @@ -9,6 +9,7 @@ import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; import { peerToPeerMessageHandler } from '../handlers/peer-to-peer-message-handler.js'; +import { useDispatchWithMessageSource } from '../hooks/ops-hooks.js'; import { IdentityClientContext } from '../shared/identity-client-context.js'; import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; import { isWebPlatform } from '../types/device-types.js'; @@ -134,6 +135,7 @@ const socketSessionCounter = React.useRef(0); const promises = React.useRef({}); const heartbeatTimeoutID = React.useRef(); + const dispatchWithMessageSource = useDispatchWithMessageSource(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); @@ -308,6 +310,7 @@ peerToPeerMessage, identityClient, message.messageID, + dispatchWithMessageSource, ); } catch (e) { console.log(e.message); @@ -361,6 +364,7 @@ identityClient, onClose, createInitMessage, + dispatchWithMessageSource, ]); const sendMessageToDeviceRequest: ( diff --git a/lib/types/tunnelbroker/syncing-peer-to-peer-message-types.js b/lib/types/tunnelbroker/syncing-peer-to-peer-message-types.js new file mode 100644 --- /dev/null +++ b/lib/types/tunnelbroker/syncing-peer-to-peer-message-types.js @@ -0,0 +1,42 @@ +// @flow + +import type { TInterface, TUnion } from 'tcomb'; +import t from 'tcomb'; + +import { tShape, tString } from '../../utils/validation-utils.js'; + +export const syncingP2PMessageTypes = Object.freeze({ + ADD_KEYSERVER: 'ADD_KEYSERVER', + REMOVE_KEYSERVER: 'REMOVE_KEYSERVER', +}); + +export type AddKeyserverP2PMessage = { + +type: 'ADD_KEYSERVER', + +keyserverAdminUserID: string, + +urlPrefix: string, +}; +export const addKeyserverP2PMessageValidator: TInterface = + tShape({ + type: tString(syncingP2PMessageTypes.ADD_KEYSERVER), + keyserverAdminUserID: t.String, + urlPrefix: t.String, + }); + +export type RemoveKeyserverP2PMessage = { + +type: 'REMOVE_KEYSERVER', + +keyserverAdminUserID: string, +}; +export const removeKeyserverP2PMessageValidator: TInterface = + tShape({ + type: tString(syncingP2PMessageTypes.REMOVE_KEYSERVER), + keyserverAdminUserID: t.String, + }); + +export type SyncingP2PMessage = + | AddKeyserverP2PMessage + | RemoveKeyserverP2PMessage; + +export const syncingP2PMessageValidator: TUnion = t.union([ + addKeyserverP2PMessageValidator, + removeKeyserverP2PMessageValidator, +]); diff --git a/lib/utils/peer-to-peer-communication-utils.js b/lib/utils/peer-to-peer-communication-utils.js --- a/lib/utils/peer-to-peer-communication-utils.js +++ b/lib/utils/peer-to-peer-communication-utils.js @@ -1,9 +1,126 @@ // @flow +import { getUUID } from './uuid.js'; +import { + addKeyserverActionType, + removeKeyserverActionType, +} from '../actions/keyserver-actions.js'; +import type { AuxUserStore } from '../types/aux-user-types.js'; +import { defaultKeyserverInfo } from '../types/keyserver-types.js'; +import type { BaseAction } from '../types/redux-types.js'; +import type { OutboundP2PMessage } from '../types/sqlite-types.js'; +import { outboundP2PMessageStatuses } from '../types/sqlite-types.js'; +import { + type AddKeyserverP2PMessage, + type SyncingP2PMessage, + syncingP2PMessageTypes, + type RemoveKeyserverP2PMessage, +} from '../types/tunnelbroker/syncing-peer-to-peer-message-types.js'; +import type { CurrentUserInfo } from '../types/user-types.js'; + function getClientMessageIDFromTunnelbrokerMessageID( clientMessageID: string, ): string { return clientMessageID.split('#')[1]; } -export { getClientMessageIDFromTunnelbrokerMessageID }; +function generateMessagesToPeers( + message: SyncingP2PMessage, + peers: $ReadOnlyArray, + userID: string, +): $ReadOnlyArray { + let outboundP2PMessages: $ReadOnlyArray = []; + for (const peerID of peers) { + const messageToPeer: OutboundP2PMessage = { + messageID: getUUID(), + deviceID: peerID, + userID, + timestamp: new Date().getTime().toString(), + plaintext: JSON.stringify(message), + ciphertext: '', + status: outboundP2PMessageStatuses.addressed, + }; + outboundP2PMessages = [...outboundP2PMessages, messageToPeer]; + } + return outboundP2PMessages; +} + +function getOutboundP2PMessagesFromAction( + action: BaseAction, + auxUserStore: AuxUserStore, + currentUserInfo: ?CurrentUserInfo, +): $ReadOnlyArray { + if (action.messageSourceMetadata) { + return []; + } + if (!currentUserInfo?.id) { + return []; + } + if (action.type === addKeyserverActionType) { + const ownPeers = + auxUserStore.auxUserInfos[currentUserInfo.id].deviceList?.devices; + if (!ownPeers) { + return []; + } + + const addKeyserverP2PMessage: AddKeyserverP2PMessage = { + type: syncingP2PMessageTypes.ADD_KEYSERVER, + keyserverAdminUserID: action.payload.keyserverAdminUserID, + urlPrefix: action.payload.newKeyserverInfo.urlPrefix, + }; + return generateMessagesToPeers( + addKeyserverP2PMessage, + ownPeers, + currentUserInfo.id, + ); + } else if (action.type === removeKeyserverActionType) { + const ownPeers = + auxUserStore.auxUserInfos[currentUserInfo.id].deviceList?.devices; + if (!ownPeers) { + return []; + } + + const removeKeyserverP2PMessage: RemoveKeyserverP2PMessage = { + type: syncingP2PMessageTypes.REMOVE_KEYSERVER, + keyserverAdminUserID: action.payload.keyserverAdminUserID, + }; + return generateMessagesToPeers( + removeKeyserverP2PMessage, + ownPeers, + currentUserInfo.id, + ); + } + return []; +} + +function getActionFromOutboundP2PMessage( + syncingP2PMessage: SyncingP2PMessage, +): ?BaseAction { + if (syncingP2PMessage.type === syncingP2PMessageTypes.ADD_KEYSERVER) { + return { + type: addKeyserverActionType, + payload: { + keyserverAdminUserID: syncingP2PMessage.keyserverAdminUserID, + newKeyserverInfo: defaultKeyserverInfo( + syncingP2PMessage.keyserverAdminUserID, + ), + }, + }; + } else if ( + syncingP2PMessage.type === syncingP2PMessageTypes.REMOVE_KEYSERVER + ) { + return { + type: removeKeyserverActionType, + payload: { + keyserverAdminUserID: syncingP2PMessage.keyserverAdminUserID, + }, + }; + } + return null; +} + +export { + getClientMessageIDFromTunnelbrokerMessageID, + getOutboundP2PMessagesFromAction, + getActionFromOutboundP2PMessage, +};