Page MenuHomePhabricator

D13698.id45398.diff
No OneTemporary

D13698.id45398.diff

diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js
--- a/lib/tunnelbroker/peer-to-peer-context.js
+++ b/lib/tunnelbroker/peer-to-peer-context.js
@@ -9,6 +9,7 @@
useTunnelbroker,
} from './tunnelbroker-context.js';
import { usePeerOlmSessionsCreatorContext } from '../components/peer-olm-session-creator-provider.react.js';
+import { useActionsQueue } from '../hooks/actions-queue.js';
import { useSendPushNotifs } from '../push/send-hooks.react.js';
import { getAllPeerDevices } from '../selectors/user-selectors.js';
import {
@@ -117,7 +118,7 @@
devices: $ReadOnlyArray<DeviceSessionCreationRequest>,
) => Promise<void>,
messageIDs: ?$ReadOnlyArray<string>,
- allPeerDevices: Set<string>,
+ allPeerDevices: $ReadOnlySet<string>,
): Promise<ProcessOutboundP2PMessagesResult> {
let authMetadata;
try {
@@ -240,6 +241,13 @@
const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000;
+type QueuedMessage = {
+ +outboundMessageIDs: ?$ReadOnlyArray<string>,
+ +dmOpID: ?string,
+ +notificationsCreationData: ?NotificationsCreationData,
+ +allPeerDevices: $ReadOnlySet<string>,
+};
+
function PeerToPeerProvider(props: Props): React.Node {
const { children } = props;
@@ -271,16 +279,6 @@
return { promise, dmOpID };
}, []);
- const processingQueue = React.useRef<
- Array<{
- +outboundMessageIDs: ?$ReadOnlyArray<string>,
- +dmOpID: ?string,
- +notificationsCreationData: ?NotificationsCreationData,
- +allPeerDevices: Set<string>,
- }>,
- >([]);
- const promiseRunning = React.useRef<boolean>(false);
-
const { createOlmSessionsWithUser: peerOlmSessionsCreator } =
usePeerOlmSessionsCreatorContext();
const sendPushNotifs = useSendPushNotifs();
@@ -290,74 +288,63 @@
[allPeerDevices],
);
- const processOutboundMessagesLatestVersionRef = React.useRef(-1);
+ const processItem = React.useCallback(
+ async (message: QueuedMessage) => {
+ const { dmOpID } = message;
+ try {
+ const [result] = await Promise.all([
+ processOutboundP2PMessages(
+ sendMessageToDevice,
+ identityContext,
+ peerOlmSessionsCreator,
+ message.outboundMessageIDs,
+ message.allPeerDevices,
+ ),
+ sendPushNotifs(message.notificationsCreationData),
+ ]);
+ if (dmOpID) {
+ dmOpsSendingPromiseResolvers.current.get(dmOpID)?.resolve?.(result);
+ }
+ } catch (e) {
+ console.log(
+ `Error processing outbound P2P messages: ${
+ getMessageForException(e) ?? 'unknown'
+ }`,
+ );
+ if (dmOpID) {
+ dmOpsSendingPromiseResolvers.current.get(dmOpID)?.reject?.(e);
+ }
+ } finally {
+ if (dmOpID) {
+ dmOpsSendingPromiseResolvers.current.delete(dmOpID);
+ }
+ }
+ },
+ [
+ identityContext,
+ peerOlmSessionsCreator,
+ sendMessageToDevice,
+ sendPushNotifs,
+ ],
+ );
+ const { enqueue } = useActionsQueue(processItem);
+
const processOutboundMessages = React.useMemo(() => {
- const currentVersion = ++processOutboundMessagesLatestVersionRef.current;
return (
outboundMessageIDs: ?$ReadOnlyArray<string>,
dmOpID: ?string,
notificationsCreationData: ?NotificationsCreationData,
) => {
- processingQueue.current.push({
- outboundMessageIDs,
- dmOpID,
- notificationsCreationData,
- allPeerDevices: allPeerDevicesSet,
- });
- if (!promiseRunning.current) {
- promiseRunning.current = true;
- void (async () => {
- do {
- if (
- currentVersion !== processOutboundMessagesLatestVersionRef.current
- ) {
- break;
- }
- const queueFront = processingQueue.current.shift();
- try {
- const [result] = await Promise.all([
- processOutboundP2PMessages(
- sendMessageToDevice,
- identityContext,
- peerOlmSessionsCreator,
- queueFront?.outboundMessageIDs,
- queueFront.allPeerDevices,
- ),
- sendPushNotifs(queueFront.notificationsCreationData),
- ]);
- if (queueFront.dmOpID) {
- dmOpsSendingPromiseResolvers.current
- .get(queueFront.dmOpID)
- ?.resolve?.(result);
- }
- } catch (e) {
- console.log(
- `Error processing outbound P2P messages: ${
- getMessageForException(e) ?? 'unknown'
- }`,
- );
- if (queueFront.dmOpID) {
- dmOpsSendingPromiseResolvers.current
- .get(queueFront.dmOpID)
- ?.reject?.(e);
- }
- } finally {
- if (queueFront.dmOpID) {
- dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID);
- }
- }
- } while (processingQueue.current.length > 0);
- promiseRunning.current = false;
- })();
- }
+ enqueue([
+ {
+ outboundMessageIDs,
+ dmOpID,
+ notificationsCreationData,
+ allPeerDevices: allPeerDevicesSet,
+ },
+ ]);
};
- }, [
- allPeerDevicesSet,
- sendMessageToDevice,
- identityContext,
- peerOlmSessionsCreator,
- sendPushNotifs,
- ]);
+ }, [enqueue, allPeerDevicesSet]);
const broadcastEphemeralMessage = React.useCallback(
async (

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 2:07 AM (4 h, 48 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2554860
Default Alt Text
D13698.id45398.diff (5 KB)

Event Timeline