diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js --- a/lib/handlers/db-ops-handler.react.js +++ b/lib/handlers/db-ops-handler.react.js @@ -36,7 +36,7 @@ } prevQueueFront.current = queueFront; - const { ops, messageSourceMetadata } = queueFront; + const { ops, messageSourceMetadata, dmID } = queueFront; void (async () => { if (ops) { await processDBStoreOperations(ops); @@ -44,7 +44,7 @@ const messageIDs = ops.outboundP2PMessages.map( message => message.messageID, ); - processOutboundMessages(messageIDs); + processOutboundMessages(messageIDs, dmID); } } dispatch({ diff --git a/lib/reducers/db-ops-reducer.js b/lib/reducers/db-ops-reducer.js --- a/lib/reducers/db-ops-reducer.js +++ b/lib/reducers/db-ops-reducer.js @@ -5,6 +5,7 @@ MessageSourceMetadata, DBOpsStore, } from '../types/db-ops-types.js'; +import { scheduleP2PMessagesActionType } from '../types/dm-ops.js'; import type { BaseAction } from '../types/redux-types.js'; import type { StoreOperations } from '../types/store-ops-types.js'; import { values } from '../utils/objects.js'; @@ -16,6 +17,17 @@ ...store, queuedOps: rest, }; + } else if (action.type === scheduleP2PMessagesActionType) { + const newEntry = { + ops: { + outboundP2PMessages: action.payload.messages, + }, + dmID: action.payload.id, + }; + return { + ...store, + queuedOps: [...store.queuedOps, newEntry], + }; } return store; diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -7,10 +7,13 @@ type TunnelbrokerClientMessageToDevice, useTunnelbroker, } from './tunnelbroker-context.js'; +import type { DMOperationSpecification } from '../shared/dm-ops/dm-op-utils.js'; +import { createMessagesToPeersFromDMOp } from '../shared/dm-ops/dm-op-utils.js'; import { IdentityClientContext, type IdentityClientContextType, } from '../shared/identity-client-context.js'; +import { scheduleP2PMessagesActionType } from '../types/dm-ops.js'; import { type OutboundP2PMessage, outboundP2PMessageStatuses, @@ -22,9 +25,15 @@ import { getConfig } from '../utils/config.js'; import { createOlmSessionWithPeer } from '../utils/crypto-utils.js'; import { getMessageForException } from '../utils/errors.js'; +import { useDispatch, useSelector } from '../utils/redux-utils.js'; +import { getUUID } from '../utils/uuid.js'; type PeerToPeerContextType = { - +processOutboundMessages: (messageIDs?: $ReadOnlyArray) => void, + +processOutboundMessages: ( + outboundMessageIDs: ?$ReadOnlyArray, + dmID: ?string, + ) => void, + +sendDMOperation: (op: DMOperationSpecification) => Promise, }; const PeerToPeerContext: React.Context = @@ -149,33 +158,84 @@ function PeerToPeerProvider(props: Props): React.Node { const { children } = props; - const processingQueue = React.useRef>>([]); - const promiseRunning = React.useRef(false); - const { sendMessage } = useTunnelbroker(); const identityContext = React.useContext(IdentityClientContext); invariant(identityContext, 'Identity context should be set'); + const dispatch = useDispatch(); + const dmOpsSendingPromiseResolvers = React.useRef< + Map mixed, +reject: () => mixed }>, + >(new Map()); + const auxUserStore = useSelector(state => state.auxUserStore); + const currentUserInfo = useSelector(state => state.currentUserInfo); + + const sendDMOperation = React.useCallback( + (op: DMOperationSpecification) => { + const id = getUUID(); + const promise = new Promise((resolve, reject) => { + dmOpsSendingPromiseResolvers.current.set(id, { resolve, reject }); + }); + + const messages = createMessagesToPeersFromDMOp( + op, + auxUserStore, + currentUserInfo, + ); + dispatch({ + type: scheduleP2PMessagesActionType, + payload: { + id, + messages, + }, + }); + + return promise; + }, + [auxUserStore, currentUserInfo, dispatch], + ); + + const processingQueue = React.useRef< + Array<{ + +outboundMessageIDs: ?$ReadOnlyArray, + +dmID: ?string, + }>, + >([]); + const promiseRunning = React.useRef(false); + const processOutboundMessages = React.useCallback( - (messageIDs?: $ReadOnlyArray) => { - processingQueue.current.push(messageIDs); + (outboundMessageIDs: ?$ReadOnlyArray, dmID: ?string) => { + processingQueue.current.push({ outboundMessageIDs, dmID }); if (!promiseRunning.current) { promiseRunning.current = true; void (async () => { do { - const nextMessageIDs = processingQueue.current.shift(); + const queueFront = processingQueue.current.shift(); try { await processOutboundP2PMessages( sendMessage, identityContext, - nextMessageIDs, + queueFront?.outboundMessageIDs, ); + if (queueFront.dmID) { + dmOpsSendingPromiseResolvers.current + .get(queueFront.dmID) + ?.resolve?.(); + } } catch (e) { console.log( `Error processing outbound P2P messages: ${ getMessageForException(e) ?? 'unknown' }`, ); + if (queueFront.dmID) { + dmOpsSendingPromiseResolvers.current + .get(queueFront.dmID) + ?.reject?.(); + } + } finally { + if (queueFront.dmID) { + dmOpsSendingPromiseResolvers.current.delete(queueFront.dmID); + } } } while (processingQueue.current.length > 0); promiseRunning.current = false; @@ -196,8 +256,9 @@ const value: PeerToPeerContextType = React.useMemo( () => ({ processOutboundMessages, + sendDMOperation, }), - [processOutboundMessages], + [processOutboundMessages, sendDMOperation], ); return ( diff --git a/lib/types/db-ops-types.js b/lib/types/db-ops-types.js --- a/lib/types/db-ops-types.js +++ b/lib/types/db-ops-types.js @@ -14,6 +14,7 @@ } | { +ops: StoreOperations, + +dmID?: string, }; export type DBOpsStore = { diff --git a/lib/types/dm-ops.js b/lib/types/dm-ops.js --- a/lib/types/dm-ops.js +++ b/lib/types/dm-ops.js @@ -4,6 +4,7 @@ import t from 'tcomb'; import type { RawMessageInfo } from './message-types.js'; +import type { OutboundP2PMessage } from './sqlite-types.js'; import { type NonSidebarThickThreadType, nonSidebarThickThreadTypes, @@ -94,3 +95,9 @@ rawMessageInfos: Array, updateInfos: Array, }; + +export const scheduleP2PMessagesActionType = 'SCHEDULE_P2P_MESSAGES'; +export type ScheduleP2PMessages = { + +id: string, + +messages: $ReadOnlyArray, +}; diff --git a/lib/types/redux-types.js b/lib/types/redux-types.js --- a/lib/types/redux-types.js +++ b/lib/types/redux-types.js @@ -40,6 +40,7 @@ GetVersionActionPayload, LastCommunicatedPlatformDetails, } from './device-types.js'; +import type { ScheduleP2PMessages } from './dm-ops.js'; import type { DraftStore } from './draft-types.js'; import type { EnabledApps, SupportedApps } from './enabled-apps.js'; import type { @@ -1561,7 +1562,8 @@ +error: true, +payload: Error, +loadingInfo: LoadingInfo, - }, + } + | { +type: 'SCHEDULE_P2P_MESSAGES', +payload: ScheduleP2PMessages }, }>; export type ActionPayload = ?(Object | Array<*> | $ReadOnlyArray<*> | string);