diff --git a/lib/hooks/actions-queue.js b/lib/hooks/actions-queue.js new file mode 100644 --- /dev/null +++ b/lib/hooks/actions-queue.js @@ -0,0 +1,44 @@ +// @flow + +import * as React from 'react'; + +type MessageQueueHook = { + +enqueue: (items: $ReadOnlyArray) => void, +}; + +function useActionsQueue( + performAction: (item: T) => mixed | Promise, +): MessageQueueHook { + const [queue, setQueue] = React.useState<$ReadOnlyArray>([]); + const isProcessing = React.useRef(false); + + const process = React.useCallback( + async (action: T) => { + isProcessing.current = true; + try { + await performAction(action); + } finally { + isProcessing.current = false; + setQueue(currentQueue => currentQueue.slice(1)); + } + }, + [performAction], + ); + + const enqueue = React.useCallback( + (items: $ReadOnlyArray) => + setQueue(prevQueue => [...prevQueue, ...items]), + [], + ); + + React.useEffect(() => { + if (isProcessing.current || queue.length === 0) { + return; + } + void process(queue[0]); + }, [process, queue]); + + return { enqueue }; +} + +export { useActionsQueue }; diff --git a/lib/shared/dm-ops/dm-ops-queue-handler.react.js b/lib/shared/dm-ops/dm-ops-queue-handler.react.js --- a/lib/shared/dm-ops/dm-ops-queue-handler.react.js +++ b/lib/shared/dm-ops/dm-ops-queue-handler.react.js @@ -4,6 +4,7 @@ import { dmOperationSpecificationTypes } from './dm-op-utils.js'; import { useProcessDMOperation } from './process-dm-ops.js'; +import { useActionsQueue } from '../../hooks/actions-queue.js'; import { messageInfoSelector } from '../../selectors/chat-selectors.js'; import { entryInfoSelector, @@ -16,13 +17,28 @@ clearQueuedThreadDMOpsActionType, pruneDMOpsQueueActionType, } from '../../types/dm-ops.js'; -import type { OperationsQueue } from '../../types/dm-ops.js'; +import type { DMOperation } from '../../types/dm-ops.js'; +import type { BaseAction } from '../../types/redux-types.js'; import { useDispatch, useSelector } from '../../utils/redux-utils.js'; const PRUNING_FREQUENCY = 60 * 60 * 1000; const FIRST_PRUNING_DELAY = 10 * 60 * 1000; const QUEUED_OPERATION_TTL = 3 * 24 * 60 * 60 * 1000; +type QueueItem = + | { + +type: 'operation', + +operation: DMOperation, + } + | { + +type: 'action', + +action: BaseAction, + } + | { + +type: 'function', + +itemFunction: () => mixed, + }; + function DMOpsQueueHandler(): React.Node { const dispatch = useDispatch(); @@ -54,24 +70,31 @@ ); const processDMOperation = useProcessDMOperation(); - const processOperationsQueue = React.useCallback( - (queue: OperationsQueue) => { - for (const dmOp of queue) { - void processDMOperation({ + + const processItem = React.useCallback( + async (item: QueueItem) => { + if (item.type === 'operation') { + await processDMOperation({ // This is `INBOUND` because we assume that when generating // `dmOperationSpecificationTypes.OUBOUND` it should be possible // to be processed and never queued up. type: dmOperationSpecificationTypes.INBOUND, - op: dmOp.operation, + op: item.operation, // There is no metadata, because messages were confirmed when // adding to the queue. metadata: null, }); + } else if (item.type === 'action') { + dispatch(item.action); + } else { + item.itemFunction(); } }, - [processDMOperation], + [dispatch, processDMOperation], ); + const { enqueue } = useActionsQueue(processItem); + React.useEffect(() => { const prevThreadInfos = prevThreadInfosRef.current; prevThreadInfosRef.current = threadInfos; @@ -80,15 +103,23 @@ if (!threadInfos[threadID] || prevThreadInfos[threadID]) { continue; } - processOperationsQueue(queuedThreadOperations[threadID]); - dispatch({ - type: clearQueuedThreadDMOpsActionType, - payload: { - threadID, + enqueue([ + ...queuedThreadOperations[threadID].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedThreadDMOpsActionType, + payload: { + threadID, + }, + }, }, - }); + ]); } - }, [dispatch, processOperationsQueue, queuedThreadOperations, threadInfos]); + }, [dispatch, enqueue, queuedThreadOperations, threadInfos]); const messageInfos = useSelector(messageInfoSelector); const prevMessageInfosRef = React.useRef({}); @@ -105,15 +136,23 @@ if (!messageInfos[messageID] || prevMessageInfos[messageID]) { continue; } - processOperationsQueue(queuedMessageOperations[messageID]); - dispatch({ - type: clearQueuedMessageDMOpsActionType, - payload: { - messageID, + enqueue([ + ...queuedMessageOperations[messageID].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedMessageDMOpsActionType, + payload: { + messageID, + }, + }, }, - }); + ]); } - }, [dispatch, messageInfos, processOperationsQueue, queuedMessageOperations]); + }, [dispatch, enqueue, messageInfos, queuedMessageOperations]); const entryInfos = useSelector(entryInfoSelector); const prevEntryInfosRef = React.useRef({}); @@ -130,15 +169,23 @@ if (!entryInfos[entryID] || prevEntryInfos[entryID]) { continue; } - processOperationsQueue(queuedEntryOperations[entryID]); - dispatch({ - type: clearQueuedEntryDMOpsActionType, - payload: { - entryID, + enqueue([ + ...queuedEntryOperations[entryID].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedEntryDMOpsActionType, + payload: { + entryID, + }, + }, }, - }); + ]); } - }, [dispatch, entryInfos, processOperationsQueue, queuedEntryOperations]); + }, [dispatch, enqueue, entryInfos, queuedEntryOperations]); const queuedMembershipOperations = useSelector( state => state.queuedDMOperations.membershipQueue, @@ -169,24 +216,32 @@ runningMembershipOperations.current.get(threadID)?.add(member.id); - processOperationsQueue(queuedMembershipOperations[threadID][member.id]); - - dispatch({ - type: clearQueuedMembershipDMOpsActionType, - payload: { - threadID, - userID: member.id, + enqueue([ + ...queuedMembershipOperations[threadID][member.id].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedMembershipDMOpsActionType, + payload: { + threadID, + userID: member.id, + }, + }, }, - }); - runningMembershipOperations.current.get(threadID)?.delete(member.id); + { + type: 'function', + itemFunction: () => + runningMembershipOperations.current + .get(threadID) + ?.delete(member.id), + }, + ]); } } - }, [ - dispatch, - processOperationsQueue, - queuedMembershipOperations, - threadInfos, - ]); + }, [dispatch, enqueue, queuedMembershipOperations, threadInfos]); return null; }