diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -36,7 +36,7 @@ } prevQueueFront.current = queueFront; - const { ops, messageSourceMetadata } = queueFront; + const { ops, messageSourceMetadata, dmOpID } = queueFront; void (async () => { if (ops) { await processDBStoreOperations(ops); @@ -44,7 +44,7 @@ const messageIDs = ops.outboundP2PMessages.map( message => message.messageID, ); - processOutboundMessages(messageIDs); + processOutboundMessages(messageIDs, dmOpID); } } dispatch({ diff --git a/lib/reducers/db-ops-reducer.js b/lib/reducers/db-ops-reducer.js --- a/lib/reducers/db-ops-reducer.js +++ b/lib/reducers/db-ops-reducer.js @@ -8,6 +8,7 @@ MessageSourceMetadata, DBOpsStore, } from '../types/db-ops-types.js'; +import { scheduleP2PMessagesActionType } from '../types/dm-ops.js'; import { messageTypes } from '../types/message-types-enum.js'; import type { BaseAction } from '../types/redux-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; @@ -20,6 +21,17 @@ ...store, queuedOps: rest, }; + } else if (action.type === scheduleP2PMessagesActionType) { + const newEntry = { + ops: { + outboundP2PMessages: action.payload.messages, + }, + dmOpID: action.payload.dmOpID, + }; + return { + ...store, + queuedOps: [...store.queuedOps, newEntry], + }; } return store; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -2,15 +2,21 @@ import invariant from 'invariant'; import * as React from 'react'; +import uuid from 'uuid'; import { type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; +import { + createMessagesToPeersFromDMOp, + type DMOperationSpecification, +} from '../shared/dm-ops/dm-op-utils.js'; import { IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; +import { scheduleP2PMessagesActionType } from '../types/dm-ops.js'; import { type OutboundP2PMessage, outboundP2PMessageStatuses, @@ -22,9 +28,14 @@ import { getConfig } from '../utils/config.js'; import { createOlmSessionWithPeer } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; +import { useDispatch, useSelector } from '../utils/redux-utils.js'; type PeerToPeerContextType = { - +processOutboundMessages: (messageIDs?: $ReadOnlyArray) => void, + +processOutboundMessages: ( + outboundMessageIDs: ?$ReadOnlyArray, + dmOpID: ?string, + ) => void, + +sendDMOperation: (op: DMOperationSpecification) => Promise, }; const PeerToPeerContext: React.Context = @@ -149,33 +160,84 @@ function PeerToPeerProvider(props: Props): React.Node { const { children } = props; - const processingQueue = React.useRef>>([]); - const promiseRunning = React.useRef(false); - const { sendMessageToDevice } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); + const dispatch = useDispatch(); + const dmOpsSendingPromiseResolvers = React.useRef< + Map mixed, +reject: Error => mixed }>, + >(new Map()); + const auxUserStore = useSelector(state => state.auxUserStore); + const currentUserInfo = useSelector(state => state.currentUserInfo); + + const sendDMOperation = React.useCallback( + async (op: DMOperationSpecification) => { + const dmOpID = uuid.v4(); + const promise = new Promise((resolve, reject) => { + dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject }); + }); + + const messages = await createMessagesToPeersFromDMOp( + op, + auxUserStore, + currentUserInfo, + ); + dispatch({ + type: scheduleP2PMessagesActionType, + payload: { + dmOpID, + messages, + }, + }); + + return promise; + }, + [auxUserStore, currentUserInfo, dispatch], + ); + + const processingQueue = React.useRef< + Array<{ + +outboundMessageIDs: ?$ReadOnlyArray, + +dmOpID: ?string, + }>, + >([]); + const promiseRunning = React.useRef(false); + const processOutboundMessages = React.useCallback( - (messageIDs?: $ReadOnlyArray) => { - processingQueue.current.push(messageIDs); + (outboundMessageIDs: ?$ReadOnlyArray, dmOpID: ?string) => { + processingQueue.current.push({ outboundMessageIDs, dmOpID }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { - const nextMessageIDs = processingQueue.current.shift(); + const queueFront = processingQueue.current.shift(); try { await processOutboundP2PMessages( sendMessageToDevice, identityContext, - nextMessageIDs, + queueFront?.outboundMessageIDs, ); + if (queueFront.dmOpID) { + dmOpsSendingPromiseResolvers.current + .get(queueFront.dmOpID) + ?.resolve?.(); + } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); + if (queueFront.dmOpID) { + dmOpsSendingPromiseResolvers.current + .get(queueFront.dmOpID) + ?.reject?.(e); + } + } finally { + if (queueFront.dmOpID) { + dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID); + } } } while (processingQueue.current.length > 0); promiseRunning.current = false; @@ -196,8 +258,9 @@ const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, + sendDMOperation, }), - [processOutboundMessages], + [processOutboundMessages, sendDMOperation], ); return ( diff --git a/lib/types/db-ops-types.js b/lib/types/db-ops-types.js --- a/lib/types/db-ops-types.js +++ b/lib/types/db-ops-types.js @@ -14,6 +14,7 @@ } | { +ops: StoreOperations, + +dmOpID?: string, }; export type DBOpsStore = { diff --git a/lib/types/dm-ops.js b/lib/types/dm-ops.js --- a/lib/types/dm-ops.js +++ b/lib/types/dm-ops.js @@ -4,6 +4,7 @@ import t from 'tcomb'; import type { RawMessageInfo } from './message-types.js'; +import type { OutboundP2PMessage } from './sqlite-types.js'; import { type NonSidebarThickThreadType, nonSidebarThickThreadTypes, @@ -154,3 +155,9 @@ +rawMessageInfos: $ReadOnlyArray, +updateInfos: $ReadOnlyArray, }; + +export const scheduleP2PMessagesActionType = 'SCHEDULE_P2P_MESSAGES'; +export type ScheduleP2PMessagesPayload = { + +dmOpID: string, + +messages: $ReadOnlyArray, +}; diff --git a/lib/types/redux-types.js b/lib/types/redux-types.js --- a/lib/types/redux-types.js +++ b/lib/types/redux-types.js @@ -40,7 +40,10 @@ GetVersionActionPayload, LastCommunicatedPlatformDetails, } from './device-types.js'; -import type { ProcessDMOpsPayload } from './dm-ops.js'; +import type { + ScheduleP2PMessagesPayload, + ProcessDMOpsPayload, +} from './dm-ops.js'; import type { DraftStore } from './draft-types.js'; import type { EnabledApps, SupportedApps } from './enabled-apps.js'; import type { @@ -1566,7 +1569,8 @@ | { +type: 'PROCESS_DM_OPS', +payload: ProcessDMOpsPayload, - }, + } + | { +type: 'SCHEDULE_P2P_MESSAGES', +payload: ScheduleP2PMessagesPayload }, }>; export type ActionPayload = ?(Object | Array<*> | $ReadOnlyArray<*> | string); diff --git a/web/shared-worker/utils/store.js b/web/shared-worker/utils/store.js --- a/web/shared-worker/utils/store.js +++ b/web/shared-worker/utils/store.js @@ -224,7 +224,8 @@ convertedUserStoreOperations.length === 0 && convertedMessageStoreOperations.length === 0 && convertedThreadActivityStoreOperations.length === 0 && - convertedEntryStoreOperations.length === 0 + convertedEntryStoreOperations.length === 0 && + outboundP2PMessages?.length === 0 ) { return; }