Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3318505
D13376.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Referenced Files
None
Subscribers
None
D13376.id.diff
View Options
diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js
--- a/lib/tunnelbroker/peer-to-peer-context.js
+++ b/lib/tunnelbroker/peer-to-peer-context.js
@@ -16,19 +16,16 @@
type IdentityClientContextType,
} from '../shared/identity-client-context.js';
import type { NotificationsCreationData } from '../types/notif-types.js';
-import {
- type OutboundP2PMessage,
- outboundP2PMessageStatuses,
-} from '../types/sqlite-types.js';
-import {
- type EncryptedMessage,
- peerToPeerMessageTypes,
-} from '../types/tunnelbroker/peer-to-peer-message-types.js';
+import { type OutboundP2PMessage } from '../types/sqlite-types.js';
import { getConfig } from '../utils/config.js';
import type { DeviceSessionCreationRequest } from '../utils/crypto-utils.js';
import { getMessageForException } from '../utils/errors.js';
import { entries } from '../utils/objects.js';
-import { olmSessionErrors } from '../utils/olm-utils.js';
+import {
+ ephemeralEncryptAndSendMessageToPeer,
+ handleOutboundP2PMessage,
+ type HandleOutboundP2PMessageResult,
+} from '../utils/peer-to-peer-communication-utils.js';
type PeerToPeerContextType = {
+processOutboundMessages: (
@@ -61,6 +58,50 @@
+failedMessageIDs: $ReadOnlyArray<string>,
};
+async function createMissingSession(
+ userDevicesWithoutSession: { [userID: string]: Array<string> },
+ peerOlmSessionsCreator: (
+ userID: string,
+ devices: $ReadOnlyArray<DeviceSessionCreationRequest>,
+ ) => Promise<void>,
+): Promise<void> {
+ const creatingSessionPromises = entries(userDevicesWithoutSession).map(
+ async ([userID, devices]) => {
+ try {
+ await peerOlmSessionsCreator(
+ userID,
+ devices.map(deviceID => ({ deviceID })),
+ );
+ } catch (e) {
+ // Session creation may fail for some devices,
+ // but we should still pursue delivery for others.
+ console.log(e);
+ }
+ },
+ );
+ await Promise.all(creatingSessionPromises);
+}
+
+function processOutboundP2PMessagesResult(
+ messageIDs: ?$ReadOnlyArray<string>,
+ sentMessagesMap: {
+ [messageID: string]: boolean,
+ },
+): ProcessOutboundP2PMessagesResult {
+ const sentMessagesSet = new Set(Object.keys(sentMessagesMap));
+ const failedMessageIDs =
+ messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? [];
+ if (failedMessageIDs.length > 0) {
+ return {
+ result: 'failure',
+ failedMessageIDs,
+ };
+ }
+ return {
+ result: 'success',
+ };
+}
+
async function processOutboundP2PMessages(
sendMessage: (
message: TunnelbrokerClientMessageToDevice,
@@ -98,6 +139,7 @@
const sentMessagesMap: { [messageID: string]: boolean } = {};
+ // 1. Retrieve messages to send.
let messages;
if (messageIDs) {
messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs);
@@ -116,116 +158,61 @@
messages = allMessages.filter(message => message.supportsAutoRetry);
}
- const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {};
+ const messagesMap: { [messageID: string]: OutboundP2PMessage } = {};
+
+ // 2. Optimistically attempt to send all messages, and all should succeed,
+ // the only exceptions are messages for devices we don't have a session
+ // with or some other issues like network connection.
+ const messagesPromises: Array<Promise<HandleOutboundP2PMessageResult>> = [];
for (const message: OutboundP2PMessage of messages) {
- if (!devicesMap[message.deviceID]) {
- devicesMap[message.deviceID] = [message];
- } else {
- devicesMap[message.deviceID].push(message);
- }
+ messagesMap[message.messageID] = message;
+ messagesPromises.push(
+ handleOutboundP2PMessage(message, authMetadata, sendMessage),
+ );
}
-
- const sendMessageToPeer = async (
- message: OutboundP2PMessage,
- ): Promise<void> => {
- if (!authMetadata.deviceID || !authMetadata.userID) {
- return;
+ const messagesResults: Array<HandleOutboundP2PMessageResult> =
+ await Promise.all(messagesPromises);
+
+ // 3. Analyze results to retrieve all devices that need session creation
+ // and map by userID.
+ const userDevicesWithoutSession: { [userID: string]: Array<string> } = {};
+ const messagesToRetry: Array<OutboundP2PMessage> = [];
+ for (const result of messagesResults) {
+ if (result.status === 'success') {
+ sentMessagesMap[result.messageID] = true;
}
- try {
- const encryptedMessage: EncryptedMessage = {
- type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE,
- senderInfo: {
- deviceID: authMetadata.deviceID,
- userID: authMetadata.userID,
- },
- encryptedData: JSON.parse(message.ciphertext),
- };
- await sendMessage(
- {
- deviceID: message.deviceID,
- payload: JSON.stringify(encryptedMessage),
- },
- message.messageID,
- );
- await sqliteAPI.markOutboundP2PMessageAsSent(
- message.messageID,
- message.deviceID,
- );
- sentMessagesMap[message.messageID] = true;
- } catch (e) {
- console.error(e);
+ if (result.status === 'missing_session') {
+ messagesToRetry.push(messagesMap[result.messageID]);
+ const { userID, deviceID } = messagesMap[result.messageID];
+ if (userDevicesWithoutSession[userID]) {
+ userDevicesWithoutSession[userID].push(deviceID);
+ } else {
+ userDevicesWithoutSession[userID] = [deviceID];
+ }
}
- };
-
- const devicePromises = entries(devicesMap).map(
- async ([peerDeviceID, deviceMessages]) => {
- for (const message of deviceMessages) {
- if (message.status === outboundP2PMessageStatuses.persisted) {
- try {
- const result = await olmAPI.encryptAndPersist(
- message.plaintext,
- message.deviceID,
- message.messageID,
- );
-
- const encryptedMessage: OutboundP2PMessage = {
- ...message,
- ciphertext: JSON.stringify(result),
- };
- await sendMessageToPeer(encryptedMessage);
- } catch (e) {
- if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) {
- console.log(`Error sending messages to peer ${peerDeviceID}`, e);
- break;
- }
- try {
- await peerOlmSessionsCreator(message.userID, [
- { deviceID: peerDeviceID },
- ]);
- const result = await olmAPI.encryptAndPersist(
- message.plaintext,
- message.deviceID,
- message.messageID,
- );
- const encryptedMessage: OutboundP2PMessage = {
- ...message,
- ciphertext: JSON.stringify(result),
- };
+ }
- await sendMessageToPeer(encryptedMessage);
- } catch (err) {
- console.log(
- `Error sending messages to peer ${peerDeviceID}`,
- err,
- );
- break;
- }
- }
- } else if (message.status === outboundP2PMessageStatuses.encrypted) {
- await sendMessageToPeer(message);
- } else if (message.status === outboundP2PMessageStatuses.sent) {
- // Handle edge-case when message was sent, but it wasn't updated
- // in the message store.
- sentMessagesMap[message.messageID] = true;
- }
- }
- },
- );
+ if (messagesToRetry.length === 0) {
+ return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap);
+ }
- await Promise.all(devicePromises);
+ // 4. Create sessions with users who have at least one device
+ // without a session.
+ await createMissingSession(userDevicesWithoutSession, peerOlmSessionsCreator);
- const sentMessagesSet = new Set(Object.keys(sentMessagesMap));
- const failedMessageIDs =
- messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? [];
- if (failedMessageIDs.length > 0) {
- return {
- result: 'failure',
- failedMessageIDs,
- };
+ // 5. Retry messages for which the session was missing.
+ const retryPromises = messagesToRetry.map(message =>
+ handleOutboundP2PMessage(message, authMetadata, sendMessage),
+ );
+ const retryResults: Array<HandleOutboundP2PMessageResult> =
+ await Promise.all(retryPromises);
+ for (const result of retryResults) {
+ if (result.status === 'success') {
+ sentMessagesMap[result.messageID] = true;
+ }
}
- return {
- result: 'success',
- };
+
+ return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap);
}
const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000;
@@ -347,61 +334,56 @@
const { olmAPI } = getConfig();
await olmAPI.initializeCryptoAccount();
- // We want it distinct by device ID to avoid potentially creating
- // multiple Olm sessions with the same device simultaneously.
- const recipientsDistinctByDeviceID = [
- ...new Map(recipients.map(item => [item.deviceID, item])).values(),
- ];
- const senderInfo = { deviceID: thisDeviceID, userID: thisUserID };
- const promises = recipientsDistinctByDeviceID.map(async recipient => {
- try {
- const encryptedData = await olmAPI.encrypt(
- contentPayload,
- recipient.deviceID,
- );
- const encryptedMessage: EncryptedMessage = {
- type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE,
- senderInfo,
- encryptedData,
- };
- await sendMessageToDevice({
- deviceID: recipient.deviceID,
- payload: JSON.stringify(encryptedMessage),
- });
- } catch (e) {
- if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) {
- console.log(
- `Error sending messages to peer ${recipient.deviceID}`,
- e,
- );
- return;
- }
- try {
- await peerOlmSessionsCreator(recipient.userID, [
- { deviceID: recipient.deviceID },
- ]);
- const encryptedData = await olmAPI.encrypt(
- contentPayload,
- recipient.deviceID,
- );
- const encryptedMessage: EncryptedMessage = {
- type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE,
- senderInfo,
- encryptedData,
- };
- await sendMessageToDevice({
- deviceID: recipient.deviceID,
- payload: JSON.stringify(encryptedMessage),
- });
- } catch (err) {
- console.warn(
- `Error sending Olm-encrypted message to device ${recipient.deviceID}:`,
- err,
- );
+ // 1. Optimistically attempt to send all messages, and all should
+ // succeed, the only exceptions are messages for devices we don't
+ // have a session with or some other issues like network connection.
+ const recipientPromises = recipients.map(async recipient =>
+ ephemeralEncryptAndSendMessageToPeer(
+ contentPayload,
+ recipient,
+ authMetadata,
+ sendMessageToDevice,
+ ),
+ );
+ const messagesResults = await Promise.all(recipientPromises);
+
+ // 2. Analyze results to retrieve all devices that need session creation
+ // and map by userID.
+ const userDevicesWithoutSession: { [userID: string]: Array<string> } = {};
+ const recipientsToRetry: Array<{ +userID: string, +deviceID: string }> =
+ [];
+ for (const result of messagesResults) {
+ if (result.status === 'missing_session') {
+ recipientsToRetry.push(result.recipient);
+ const { userID, deviceID } = result.recipient;
+ if (userDevicesWithoutSession[userID]) {
+ userDevicesWithoutSession[userID].push(deviceID);
+ } else {
+ userDevicesWithoutSession[userID] = [deviceID];
}
}
- });
- await Promise.all(promises);
+ }
+ if (recipientsToRetry.length === 0) {
+ return;
+ }
+
+ // 3.Create a session with users which has at
+ // least one device without a session.
+ await createMissingSession(
+ userDevicesWithoutSession,
+ peerOlmSessionsCreator,
+ );
+
+ // 4. Retry recipients for which session was missing.
+ const retryPromises = recipientsToRetry.map(async recipient =>
+ ephemeralEncryptAndSendMessageToPeer(
+ contentPayload,
+ recipient,
+ authMetadata,
+ sendMessageToDevice,
+ ),
+ );
+ await Promise.all(retryPromises);
},
[peerOlmSessionsCreator, sendMessageToDevice],
);
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Nov 20, 8:34 PM (21 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2548956
Default Alt Text
D13376.id.diff (12 KB)
Attached To
Mode
D13376: [lib] refactor sending `OutboundP2PMessages` to peers
Attached
Detach File
Event Timeline
Log In to Comment