Page MenuHomePhabricator

D12786.diff
No OneTemporary

D12786.diff

diff --git a/lib/handlers/db-ops-handler.react.js b/lib/handlers/db-ops-handler.react.js
--- a/lib/handlers/db-ops-handler.react.js
+++ b/lib/handlers/db-ops-handler.react.js
@@ -36,7 +36,7 @@
}
prevQueueFront.current = queueFront;
- const { ops, messageSourceMetadata } = queueFront;
+ const { ops, messageSourceMetadata, dmOpID } = queueFront;
void (async () => {
if (ops) {
await processDBStoreOperations(ops);
@@ -44,7 +44,7 @@
const messageIDs = ops.outboundP2PMessages.map(
message => message.messageID,
);
- processOutboundMessages(messageIDs);
+ processOutboundMessages(messageIDs, dmOpID);
}
}
dispatch({
diff --git a/lib/reducers/db-ops-reducer.js b/lib/reducers/db-ops-reducer.js
--- a/lib/reducers/db-ops-reducer.js
+++ b/lib/reducers/db-ops-reducer.js
@@ -8,6 +8,7 @@
MessageSourceMetadata,
DBOpsStore,
} from '../types/db-ops-types.js';
+import { scheduleP2PMessagesActionType } from '../types/dm-ops.js';
import { messageTypes } from '../types/message-types-enum.js';
import type { BaseAction } from '../types/redux-types.js';
import type { StoreOperations } from '../types/store-ops-types.js';
@@ -20,6 +21,17 @@
...store,
queuedOps: rest,
};
+ } else if (action.type === scheduleP2PMessagesActionType) {
+ const newEntry = {
+ ops: {
+ outboundP2PMessages: action.payload.messages,
+ },
+ dmOpID: action.payload.dmOpID,
+ };
+ return {
+ ...store,
+ queuedOps: [...store.queuedOps, newEntry],
+ };
}
return store;
diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js
--- a/lib/tunnelbroker/peer-to-peer-context.js
+++ b/lib/tunnelbroker/peer-to-peer-context.js
@@ -2,15 +2,21 @@
import invariant from 'invariant';
import * as React from 'react';
+import uuid from 'uuid';
import {
type TunnelbrokerClientMessageToDevice,
useTunnelbroker,
} from './tunnelbroker-context.js';
+import {
+ createMessagesToPeersFromDMOp,
+ type DMOperationSpecification,
+} from '../shared/dm-ops/dm-op-utils.js';
import {
IdentityClientContext,
type IdentityClientContextType,
} from '../shared/identity-client-context.js';
+import { scheduleP2PMessagesActionType } from '../types/dm-ops.js';
import {
type OutboundP2PMessage,
outboundP2PMessageStatuses,
@@ -22,9 +28,14 @@
import { getConfig } from '../utils/config.js';
import { createOlmSessionWithPeer } from '../utils/crypto-utils.js';
import { getMessageForException } from '../utils/errors.js';
+import { useDispatch, useSelector } from '../utils/redux-utils.js';
type PeerToPeerContextType = {
- +processOutboundMessages: (messageIDs?: $ReadOnlyArray<string>) => void,
+ +processOutboundMessages: (
+ outboundMessageIDs: ?$ReadOnlyArray<string>,
+ dmOpID: ?string,
+ ) => void,
+ +sendDMOperation: (op: DMOperationSpecification) => Promise<void>,
};
const PeerToPeerContext: React.Context<?PeerToPeerContextType> =
@@ -149,33 +160,84 @@
function PeerToPeerProvider(props: Props): React.Node {
const { children } = props;
- const processingQueue = React.useRef<Array<?$ReadOnlyArray<string>>>([]);
- const promiseRunning = React.useRef<boolean>(false);
-
const { sendMessageToDevice } = useTunnelbroker();
const identityContext = React.useContext(IdentityClientContext);
invariant(identityContext, 'Identity context should be set');
+ const dispatch = useDispatch();
+ const dmOpsSendingPromiseResolvers = React.useRef<
+ Map<string, { +resolve: () => mixed, +reject: Error => mixed }>,
+ >(new Map());
+ const auxUserStore = useSelector(state => state.auxUserStore);
+ const currentUserInfo = useSelector(state => state.currentUserInfo);
+
+ const sendDMOperation = React.useCallback(
+ async (op: DMOperationSpecification) => {
+ const dmOpID = uuid.v4();
+ const promise = new Promise<void>((resolve, reject) => {
+ dmOpsSendingPromiseResolvers.current.set(dmOpID, { resolve, reject });
+ });
+
+ const messages = await createMessagesToPeersFromDMOp(
+ op,
+ auxUserStore,
+ currentUserInfo,
+ );
+ dispatch({
+ type: scheduleP2PMessagesActionType,
+ payload: {
+ dmOpID,
+ messages,
+ },
+ });
+
+ return promise;
+ },
+ [auxUserStore, currentUserInfo, dispatch],
+ );
+
+ const processingQueue = React.useRef<
+ Array<{
+ +outboundMessageIDs: ?$ReadOnlyArray<string>,
+ +dmOpID: ?string,
+ }>,
+ >([]);
+ const promiseRunning = React.useRef<boolean>(false);
+
const processOutboundMessages = React.useCallback(
- (messageIDs?: $ReadOnlyArray<string>) => {
- processingQueue.current.push(messageIDs);
+ (outboundMessageIDs: ?$ReadOnlyArray<string>, dmOpID: ?string) => {
+ processingQueue.current.push({ outboundMessageIDs, dmOpID });
if (!promiseRunning.current) {
promiseRunning.current = true;
void (async () => {
do {
- const nextMessageIDs = processingQueue.current.shift();
+ const queueFront = processingQueue.current.shift();
try {
await processOutboundP2PMessages(
sendMessageToDevice,
identityContext,
- nextMessageIDs,
+ queueFront?.outboundMessageIDs,
);
+ if (queueFront.dmOpID) {
+ dmOpsSendingPromiseResolvers.current
+ .get(queueFront.dmOpID)
+ ?.resolve?.();
+ }
} catch (e) {
console.log(
`Error processing outbound P2P messages: ${
getMessageForException(e) ?? 'unknown'
}`,
);
+ if (queueFront.dmOpID) {
+ dmOpsSendingPromiseResolvers.current
+ .get(queueFront.dmOpID)
+ ?.reject?.(e);
+ }
+ } finally {
+ if (queueFront.dmOpID) {
+ dmOpsSendingPromiseResolvers.current.delete(queueFront.dmOpID);
+ }
}
} while (processingQueue.current.length > 0);
promiseRunning.current = false;
@@ -196,8 +258,9 @@
const value: PeerToPeerContextType = React.useMemo(
() => ({
processOutboundMessages,
+ sendDMOperation,
}),
- [processOutboundMessages],
+ [processOutboundMessages, sendDMOperation],
);
return (
diff --git a/lib/types/db-ops-types.js b/lib/types/db-ops-types.js
--- a/lib/types/db-ops-types.js
+++ b/lib/types/db-ops-types.js
@@ -14,6 +14,7 @@
}
| {
+ops: StoreOperations,
+ +dmOpID?: string,
};
export type DBOpsStore = {
diff --git a/lib/types/dm-ops.js b/lib/types/dm-ops.js
--- a/lib/types/dm-ops.js
+++ b/lib/types/dm-ops.js
@@ -4,6 +4,7 @@
import t from 'tcomb';
import type { RawMessageInfo } from './message-types.js';
+import type { OutboundP2PMessage } from './sqlite-types.js';
import {
type NonSidebarThickThreadType,
nonSidebarThickThreadTypes,
@@ -154,3 +155,9 @@
+rawMessageInfos: $ReadOnlyArray<RawMessageInfo>,
+updateInfos: $ReadOnlyArray<ClientUpdateInfo>,
};
+
+export const scheduleP2PMessagesActionType = 'SCHEDULE_P2P_MESSAGES';
+export type ScheduleP2PMessagesPayload = {
+ +dmOpID: string,
+ +messages: $ReadOnlyArray<OutboundP2PMessage>,
+};
diff --git a/lib/types/redux-types.js b/lib/types/redux-types.js
--- a/lib/types/redux-types.js
+++ b/lib/types/redux-types.js
@@ -40,7 +40,10 @@
GetVersionActionPayload,
LastCommunicatedPlatformDetails,
} from './device-types.js';
-import type { ProcessDMOpsPayload } from './dm-ops.js';
+import type {
+ ScheduleP2PMessagesPayload,
+ ProcessDMOpsPayload,
+} from './dm-ops.js';
import type { DraftStore } from './draft-types.js';
import type { EnabledApps, SupportedApps } from './enabled-apps.js';
import type {
@@ -1566,7 +1569,8 @@
| {
+type: 'PROCESS_DM_OPS',
+payload: ProcessDMOpsPayload,
- },
+ }
+ | { +type: 'SCHEDULE_P2P_MESSAGES', +payload: ScheduleP2PMessagesPayload },
}>;
export type ActionPayload = ?(Object | Array<*> | $ReadOnlyArray<*> | string);
diff --git a/web/shared-worker/utils/store.js b/web/shared-worker/utils/store.js
--- a/web/shared-worker/utils/store.js
+++ b/web/shared-worker/utils/store.js
@@ -224,7 +224,8 @@
convertedUserStoreOperations.length === 0 &&
convertedMessageStoreOperations.length === 0 &&
convertedThreadActivityStoreOperations.length === 0 &&
- convertedEntryStoreOperations.length === 0
+ convertedEntryStoreOperations.length === 0 &&
+ outboundP2PMessages?.length === 0
) {
return;
}

File Metadata

Mime Type
text/plain
Expires
Fri, Oct 18, 11:27 PM (20 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2320144
Default Alt Text
D12786.diff (8 KB)

Event Timeline