diff --git a/lib/reducers/dm-operations-queue-reducer.js b/lib/reducers/dm-operations-queue-reducer.js --- a/lib/reducers/dm-operations-queue-reducer.js +++ b/lib/reducers/dm-operations-queue-reducer.js @@ -3,6 +3,7 @@ import _mapValues from 'lodash/fp/mapValues.js'; import { + clearDMOpsThreadQueueActionType, pruneDMOpsQueueActionType, type QueuedDMOperations, queueDMOpsActionType, @@ -34,6 +35,13 @@ ), )(store.operations), }; + } else if (action.type === clearDMOpsThreadQueueActionType) { + const { [action.payload.threadID]: removed, ...operations } = + store.operations; + return { + ...store, + operations, + }; } return store; } diff --git a/lib/shared/dm-ops/dm-ops-queue-handler.react.js b/lib/shared/dm-ops/dm-ops-queue-handler.react.js new file mode 100644 --- /dev/null +++ b/lib/shared/dm-ops/dm-ops-queue-handler.react.js @@ -0,0 +1,89 @@ +// @flow + +import * as React from 'react'; + +import { useProcessDMOperation } from './process-dm-ops.js'; +import { threadInfoSelector } from '../../selectors/thread-selectors.js'; +import { + clearDMOpsThreadQueueActionType, + pruneDMOpsQueueActionType, +} from '../../types/dm-ops.js'; +import { useDispatch, useSelector } from '../../utils/redux-utils.js'; + +const PRUNING_FREQUENCY = 60 * 60 * 1000; +const FIRST_PRUNING_DELAY = 10 * 60 * 1000; +const QUEUED_OPERATION_TTL = 3 * 24 * 60 * 60 * 1000; + +function DMOpsQueueHandler(): React.Node { + const dispatch = useDispatch(); + + const prune = React.useCallback(() => { + const now = Date.now(); + dispatch({ + type: pruneDMOpsQueueActionType, + payload: { + pruneMaxTimestamp: now - QUEUED_OPERATION_TTL, + }, + }); + }, [dispatch]); + + React.useEffect(() => { + const timeoutID = setTimeout(prune, FIRST_PRUNING_DELAY); + const intervalID = setInterval(prune, PRUNING_FREQUENCY); + + return () => { + clearTimeout(timeoutID); + clearInterval(intervalID); + }; + }, [prune]); + + const threadInfos = useSelector(threadInfoSelector); + const threadIDs = React.useMemo( + () => new Set(Object.keys(threadInfos)), + [threadInfos], + ); + const prevThreadIDsRef = React.useRef<$ReadOnlySet>(new Set()); + + const queuedOperations = useSelector( + state => state.queuedDMOperations.operations, + ); + const queuedOperationsThreadIDs = React.useMemo( + () => Object.keys(queuedOperations), + [queuedOperations], + ); + + const processDMOperation = useProcessDMOperation(); + + React.useEffect(() => { + void (async () => { + const prevThreadIDs = prevThreadIDsRef.current; + prevThreadIDsRef.current = threadIDs; + + for (const threadID of queuedOperationsThreadIDs) { + if (!threadIDs.has(threadID) || prevThreadIDs.has(threadID)) { + continue; + } + for (const dmOp of queuedOperations[threadID]) { + await processDMOperation(dmOp.operation); + } + + dispatch({ + type: clearDMOpsThreadQueueActionType, + payload: { + threadID, + }, + }); + } + })(); + }, [ + dispatch, + processDMOperation, + queuedOperations, + queuedOperationsThreadIDs, + threadIDs, + ]); + + return null; +} + +export { DMOpsQueueHandler }; diff --git a/lib/shared/dm-ops/dm-ops-queue-pruner.react.js b/lib/shared/dm-ops/dm-ops-queue-pruner.react.js deleted file mode 100644 --- a/lib/shared/dm-ops/dm-ops-queue-pruner.react.js +++ /dev/null @@ -1,37 +0,0 @@ -// @flow -import * as React from 'react'; - -import { pruneDMOpsQueueActionType } from '../../types/dm-ops.js'; -import { useDispatch } from '../../utils/redux-utils.js'; - -const PRUNING_FREQUENCY = 60 * 60 * 1000; -const FIRST_PRUNING_DELAY = 10 * 60 * 1000; -const QUEUED_OPERATION_TTL = 3 * 24 * 60 * 60 * 1000; - -function DMOpsQueuePruner(): React.Node { - const dispatch = useDispatch(); - - const prune = React.useCallback(() => { - const now = Date.now(); - dispatch({ - type: pruneDMOpsQueueActionType, - payload: { - pruneMaxTime: now - QUEUED_OPERATION_TTL, - }, - }); - }, [dispatch]); - - React.useEffect(() => { - const timeoutID = setTimeout(prune, FIRST_PRUNING_DELAY); - const intervalID = setInterval(prune, PRUNING_FREQUENCY); - - return () => { - clearTimeout(timeoutID); - clearInterval(intervalID); - }; - }, [prune]); - - return null; -} - -export { DMOpsQueuePruner }; diff --git a/lib/tunnelbroker/tunnelbroker-context.js b/lib/tunnelbroker/tunnelbroker-context.js --- a/lib/tunnelbroker/tunnelbroker-context.js +++ b/lib/tunnelbroker/tunnelbroker-context.js @@ -9,7 +9,7 @@ import { PeerToPeerMessageHandler } from './peer-to-peer-message-handler.js'; import type { SecondaryTunnelbrokerConnection } from './secondary-tunnelbroker-connection.js'; import { tunnnelbrokerURL } from '../facts/tunnelbroker.js'; -import { DMOpsQueuePruner } from '../shared/dm-ops/dm-ops-queue-pruner.react.js'; +import { DMOpsQueueHandler } from '../shared/dm-ops/dm-ops-queue-handler.react.js'; import { IdentityClientContext } from '../shared/identity-client-context.js'; import { tunnelbrokerHeartbeatTimeout } from '../shared/timeouts.js'; import { isWebPlatform } from '../types/device-types.js'; @@ -462,7 +462,7 @@ socketSend={socketSend} /> {children} - + ); } diff --git a/lib/types/dm-ops.js b/lib/types/dm-ops.js --- a/lib/types/dm-ops.js +++ b/lib/types/dm-ops.js @@ -327,6 +327,11 @@ +messages: $ReadOnlyArray, }; +export const clearDMOpsThreadQueueActionType = 'CLEAR_DM_OPS_THREAD_QUEUE'; +export type ClearDMOpsThreadQueuePayload = { + +threadID: string, +}; + export type QueuedDMOperations = { +operations: { +[threadID: string]: $ReadOnlyArray<{ diff --git a/lib/types/redux-types.js b/lib/types/redux-types.js --- a/lib/types/redux-types.js +++ b/lib/types/redux-types.js @@ -46,6 +46,7 @@ QueuedDMOperations, QueueDMOpsPayload, PruneDMOpsQueuePayload, + ClearDMOpsThreadQueuePayload, } from './dm-ops.js'; import type { DraftStore } from './draft-types.js'; import type { EnabledApps, SupportedApps } from './enabled-apps.js'; @@ -1583,7 +1584,11 @@ }, } | { +type: 'QUEUE_DM_OPS', +payload: QueueDMOpsPayload } - | { +type: 'PRUNE_DM_OPS_QUEUE', +payload: PruneDMOpsQueuePayload }, + | { +type: 'PRUNE_DM_OPS_QUEUE', +payload: PruneDMOpsQueuePayload } + | { + +type: 'CLEAR_DM_OPS_THREAD_QUEUE', + +payload: ClearDMOpsThreadQueuePayload, + }, }>; export type ActionPayload = ?(Object | Array<*> | $ReadOnlyArray<*> | string);