diff --git a/lib/hooks/actions-queue.js b/lib/hooks/actions-queue.js new file mode 100644 index 000000000..6d49f1bcd --- /dev/null +++ b/lib/hooks/actions-queue.js @@ -0,0 +1,44 @@ +// @flow + +import * as React from 'react'; + +type MessageQueueHook = { + +enqueue: (items: $ReadOnlyArray) => void, +}; + +function useActionsQueue( + performAction: (item: T) => mixed | Promise, +): MessageQueueHook { + const [queue, setQueue] = React.useState<$ReadOnlyArray>([]); + const isProcessing = React.useRef(false); + + const process = React.useCallback( + async (action: T) => { + isProcessing.current = true; + try { + await performAction(action); + } finally { + isProcessing.current = false; + setQueue(currentQueue => currentQueue.slice(1)); + } + }, + [performAction], + ); + + const enqueue = React.useCallback( + (items: $ReadOnlyArray) => + setQueue(prevQueue => [...prevQueue, ...items]), + [], + ); + + React.useEffect(() => { + if (isProcessing.current || queue.length === 0) { + return; + } + void process(queue[0]); + }, [process, queue]); + + return { enqueue }; +} + +export { useActionsQueue }; diff --git a/lib/shared/dm-ops/dm-ops-queue-handler.react.js b/lib/shared/dm-ops/dm-ops-queue-handler.react.js index 8883e72ec..6b845c7f4 100644 --- a/lib/shared/dm-ops/dm-ops-queue-handler.react.js +++ b/lib/shared/dm-ops/dm-ops-queue-handler.react.js @@ -1,194 +1,249 @@ // @flow import * as React from 'react'; import { dmOperationSpecificationTypes } from './dm-op-utils.js'; import { useProcessDMOperation } from './process-dm-ops.js'; +import { useActionsQueue } from '../../hooks/actions-queue.js'; import { messageInfoSelector } from '../../selectors/chat-selectors.js'; import { entryInfoSelector, threadInfoSelector, } from '../../selectors/thread-selectors.js'; import { clearQueuedEntryDMOpsActionType, clearQueuedMembershipDMOpsActionType, clearQueuedMessageDMOpsActionType, clearQueuedThreadDMOpsActionType, pruneDMOpsQueueActionType, } from '../../types/dm-ops.js'; -import type { OperationsQueue } from '../../types/dm-ops.js'; +import type { DMOperation } from '../../types/dm-ops.js'; +import type { BaseAction } from '../../types/redux-types.js'; import { useDispatch, useSelector } from '../../utils/redux-utils.js'; const PRUNING_FREQUENCY = 60 * 60 * 1000; const FIRST_PRUNING_DELAY = 10 * 60 * 1000; const QUEUED_OPERATION_TTL = 3 * 24 * 60 * 60 * 1000; +type QueueItem = + | { + +type: 'operation', + +operation: DMOperation, + } + | { + +type: 'action', + +action: BaseAction, + } + | { + +type: 'function', + +itemFunction: () => mixed, + }; + function DMOpsQueueHandler(): React.Node { const dispatch = useDispatch(); const prune = React.useCallback(() => { const now = Date.now(); dispatch({ type: pruneDMOpsQueueActionType, payload: { pruneMaxTimestamp: now - QUEUED_OPERATION_TTL, }, }); }, [dispatch]); React.useEffect(() => { const timeoutID = setTimeout(prune, FIRST_PRUNING_DELAY); const intervalID = setInterval(prune, PRUNING_FREQUENCY); return () => { clearTimeout(timeoutID); clearInterval(intervalID); }; }, [prune]); const threadInfos = useSelector(threadInfoSelector); const prevThreadInfosRef = React.useRef({}); const queuedThreadOperations = useSelector( state => state.queuedDMOperations.threadQueue, ); const processDMOperation = useProcessDMOperation(); - const processOperationsQueue = React.useCallback( - (queue: OperationsQueue) => { - for (const dmOp of queue) { - void processDMOperation({ + + const processItem = React.useCallback( + async (item: QueueItem) => { + if (item.type === 'operation') { + await processDMOperation({ // This is `INBOUND` because we assume that when generating // `dmOperationSpecificationTypes.OUBOUND` it should be possible // to be processed and never queued up. type: dmOperationSpecificationTypes.INBOUND, - op: dmOp.operation, + op: item.operation, // There is no metadata, because messages were confirmed when // adding to the queue. metadata: null, }); + } else if (item.type === 'action') { + dispatch(item.action); + } else { + item.itemFunction(); } }, - [processDMOperation], + [dispatch, processDMOperation], ); + const { enqueue } = useActionsQueue(processItem); + React.useEffect(() => { const prevThreadInfos = prevThreadInfosRef.current; prevThreadInfosRef.current = threadInfos; for (const threadID in queuedThreadOperations) { if (!threadInfos[threadID] || prevThreadInfos[threadID]) { continue; } - processOperationsQueue(queuedThreadOperations[threadID]); - dispatch({ - type: clearQueuedThreadDMOpsActionType, - payload: { - threadID, + enqueue([ + ...queuedThreadOperations[threadID].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedThreadDMOpsActionType, + payload: { + threadID, + }, + }, }, - }); + ]); } - }, [dispatch, processOperationsQueue, queuedThreadOperations, threadInfos]); + }, [dispatch, enqueue, queuedThreadOperations, threadInfos]); const messageInfos = useSelector(messageInfoSelector); const prevMessageInfosRef = React.useRef({}); const queuedMessageOperations = useSelector( state => state.queuedDMOperations.messageQueue, ); React.useEffect(() => { const prevMessageInfos = prevMessageInfosRef.current; prevMessageInfosRef.current = messageInfos; for (const messageID in queuedMessageOperations) { if (!messageInfos[messageID] || prevMessageInfos[messageID]) { continue; } - processOperationsQueue(queuedMessageOperations[messageID]); - dispatch({ - type: clearQueuedMessageDMOpsActionType, - payload: { - messageID, + enqueue([ + ...queuedMessageOperations[messageID].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedMessageDMOpsActionType, + payload: { + messageID, + }, + }, }, - }); + ]); } - }, [dispatch, messageInfos, processOperationsQueue, queuedMessageOperations]); + }, [dispatch, enqueue, messageInfos, queuedMessageOperations]); const entryInfos = useSelector(entryInfoSelector); const prevEntryInfosRef = React.useRef({}); const queuedEntryOperations = useSelector( state => state.queuedDMOperations.entryQueue, ); React.useEffect(() => { const prevEntryInfos = prevEntryInfosRef.current; prevEntryInfosRef.current = entryInfos; for (const entryID in queuedEntryOperations) { if (!entryInfos[entryID] || prevEntryInfos[entryID]) { continue; } - processOperationsQueue(queuedEntryOperations[entryID]); - dispatch({ - type: clearQueuedEntryDMOpsActionType, - payload: { - entryID, + enqueue([ + ...queuedEntryOperations[entryID].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedEntryDMOpsActionType, + payload: { + entryID, + }, + }, }, - }); + ]); } - }, [dispatch, entryInfos, processOperationsQueue, queuedEntryOperations]); + }, [dispatch, enqueue, entryInfos, queuedEntryOperations]); const queuedMembershipOperations = useSelector( state => state.queuedDMOperations.membershipQueue, ); const runningMembershipOperations = React.useRef>>( new Map(), ); React.useEffect(() => { for (const threadID in queuedMembershipOperations) { if (!threadInfos[threadID]) { continue; } const queuedMemberIDs = new Set( Object.keys(queuedMembershipOperations[threadID]), ); if (!runningMembershipOperations.current.has(threadID)) { runningMembershipOperations.current.set(threadID, new Set()); } for (const member of threadInfos[threadID].members) { if ( !queuedMemberIDs.has(member.id) || runningMembershipOperations.current.get(threadID)?.has(member.id) ) { continue; } runningMembershipOperations.current.get(threadID)?.add(member.id); - processOperationsQueue(queuedMembershipOperations[threadID][member.id]); - - dispatch({ - type: clearQueuedMembershipDMOpsActionType, - payload: { - threadID, - userID: member.id, + enqueue([ + ...queuedMembershipOperations[threadID][member.id].map(item => ({ + type: 'operation', + operation: item.operation, + })), + { + type: 'action', + action: { + type: clearQueuedMembershipDMOpsActionType, + payload: { + threadID, + userID: member.id, + }, + }, }, - }); - runningMembershipOperations.current.get(threadID)?.delete(member.id); + { + type: 'function', + itemFunction: () => + runningMembershipOperations.current + .get(threadID) + ?.delete(member.id), + }, + ]); } } - }, [ - dispatch, - processOperationsQueue, - queuedMembershipOperations, - threadInfos, - ]); + }, [dispatch, enqueue, queuedMembershipOperations, threadInfos]); return null; } export { DMOpsQueueHandler };