Page MenuHomePhabricator

D13376.id.diff
No OneTemporary

D13376.id.diff

diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js
--- a/lib/tunnelbroker/peer-to-peer-context.js
+++ b/lib/tunnelbroker/peer-to-peer-context.js
@@ -16,19 +16,16 @@
type IdentityClientContextType,
} from '../shared/identity-client-context.js';
import type { NotificationsCreationData } from '../types/notif-types.js';
-import {
- type OutboundP2PMessage,
- outboundP2PMessageStatuses,
-} from '../types/sqlite-types.js';
-import {
- type EncryptedMessage,
- peerToPeerMessageTypes,
-} from '../types/tunnelbroker/peer-to-peer-message-types.js';
+import { type OutboundP2PMessage } from '../types/sqlite-types.js';
import { getConfig } from '../utils/config.js';
import type { DeviceSessionCreationRequest } from '../utils/crypto-utils.js';
import { getMessageForException } from '../utils/errors.js';
import { entries } from '../utils/objects.js';
-import { olmSessionErrors } from '../utils/olm-utils.js';
+import {
+ ephemeralEncryptAndSendMessageToPeer,
+ handleOutboundP2PMessage,
+ type HandleOutboundP2PMessageResult,
+} from '../utils/peer-to-peer-communication-utils.js';
type PeerToPeerContextType = {
+processOutboundMessages: (
@@ -61,6 +58,50 @@
+failedMessageIDs: $ReadOnlyArray<string>,
};
+async function createMissingSession(
+ userDevicesWithoutSession: { [userID: string]: Array<string> },
+ peerOlmSessionsCreator: (
+ userID: string,
+ devices: $ReadOnlyArray<DeviceSessionCreationRequest>,
+ ) => Promise<void>,
+): Promise<void> {
+ const creatingSessionPromises = entries(userDevicesWithoutSession).map(
+ async ([userID, devices]) => {
+ try {
+ await peerOlmSessionsCreator(
+ userID,
+ devices.map(deviceID => ({ deviceID })),
+ );
+ } catch (e) {
+ // Session creation may fail for some devices,
+ // but we should still pursue delivery for others.
+ console.log(e);
+ }
+ },
+ );
+ await Promise.all(creatingSessionPromises);
+}
+
+function processOutboundP2PMessagesResult(
+ messageIDs: ?$ReadOnlyArray<string>,
+ sentMessagesMap: {
+ [messageID: string]: boolean,
+ },
+): ProcessOutboundP2PMessagesResult {
+ const sentMessagesSet = new Set(Object.keys(sentMessagesMap));
+ const failedMessageIDs =
+ messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? [];
+ if (failedMessageIDs.length > 0) {
+ return {
+ result: 'failure',
+ failedMessageIDs,
+ };
+ }
+ return {
+ result: 'success',
+ };
+}
+
async function processOutboundP2PMessages(
sendMessage: (
message: TunnelbrokerClientMessageToDevice,
@@ -98,6 +139,7 @@
const sentMessagesMap: { [messageID: string]: boolean } = {};
+ // 1. Retrieve messages to send.
let messages;
if (messageIDs) {
messages = await sqliteAPI.getOutboundP2PMessagesByID(messageIDs);
@@ -116,116 +158,61 @@
messages = allMessages.filter(message => message.supportsAutoRetry);
}
- const devicesMap: { [deviceID: string]: OutboundP2PMessage[] } = {};
+ const messagesMap: { [messageID: string]: OutboundP2PMessage } = {};
+
+ // 2. Optimistically attempt to send all messages, and all should succeed,
+ // the only exceptions are messages for devices we don't have a session
+ // with or some other issues like network connection.
+ const messagesPromises: Array<Promise<HandleOutboundP2PMessageResult>> = [];
for (const message: OutboundP2PMessage of messages) {
- if (!devicesMap[message.deviceID]) {
- devicesMap[message.deviceID] = [message];
- } else {
- devicesMap[message.deviceID].push(message);
- }
+ messagesMap[message.messageID] = message;
+ messagesPromises.push(
+ handleOutboundP2PMessage(message, authMetadata, sendMessage),
+ );
}
-
- const sendMessageToPeer = async (
- message: OutboundP2PMessage,
- ): Promise<void> => {
- if (!authMetadata.deviceID || !authMetadata.userID) {
- return;
+ const messagesResults: Array<HandleOutboundP2PMessageResult> =
+ await Promise.all(messagesPromises);
+
+ // 3. Analyze results to retrieve all devices that need session creation
+ // and map by userID.
+ const userDevicesWithoutSession: { [userID: string]: Array<string> } = {};
+ const messagesToRetry: Array<OutboundP2PMessage> = [];
+ for (const result of messagesResults) {
+ if (result.status === 'success') {
+ sentMessagesMap[result.messageID] = true;
}
- try {
- const encryptedMessage: EncryptedMessage = {
- type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE,
- senderInfo: {
- deviceID: authMetadata.deviceID,
- userID: authMetadata.userID,
- },
- encryptedData: JSON.parse(message.ciphertext),
- };
- await sendMessage(
- {
- deviceID: message.deviceID,
- payload: JSON.stringify(encryptedMessage),
- },
- message.messageID,
- );
- await sqliteAPI.markOutboundP2PMessageAsSent(
- message.messageID,
- message.deviceID,
- );
- sentMessagesMap[message.messageID] = true;
- } catch (e) {
- console.error(e);
+ if (result.status === 'missing_session') {
+ messagesToRetry.push(messagesMap[result.messageID]);
+ const { userID, deviceID } = messagesMap[result.messageID];
+ if (userDevicesWithoutSession[userID]) {
+ userDevicesWithoutSession[userID].push(deviceID);
+ } else {
+ userDevicesWithoutSession[userID] = [deviceID];
+ }
}
- };
-
- const devicePromises = entries(devicesMap).map(
- async ([peerDeviceID, deviceMessages]) => {
- for (const message of deviceMessages) {
- if (message.status === outboundP2PMessageStatuses.persisted) {
- try {
- const result = await olmAPI.encryptAndPersist(
- message.plaintext,
- message.deviceID,
- message.messageID,
- );
-
- const encryptedMessage: OutboundP2PMessage = {
- ...message,
- ciphertext: JSON.stringify(result),
- };
- await sendMessageToPeer(encryptedMessage);
- } catch (e) {
- if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) {
- console.log(`Error sending messages to peer ${peerDeviceID}`, e);
- break;
- }
- try {
- await peerOlmSessionsCreator(message.userID, [
- { deviceID: peerDeviceID },
- ]);
- const result = await olmAPI.encryptAndPersist(
- message.plaintext,
- message.deviceID,
- message.messageID,
- );
- const encryptedMessage: OutboundP2PMessage = {
- ...message,
- ciphertext: JSON.stringify(result),
- };
+ }
- await sendMessageToPeer(encryptedMessage);
- } catch (err) {
- console.log(
- `Error sending messages to peer ${peerDeviceID}`,
- err,
- );
- break;
- }
- }
- } else if (message.status === outboundP2PMessageStatuses.encrypted) {
- await sendMessageToPeer(message);
- } else if (message.status === outboundP2PMessageStatuses.sent) {
- // Handle edge-case when message was sent, but it wasn't updated
- // in the message store.
- sentMessagesMap[message.messageID] = true;
- }
- }
- },
- );
+ if (messagesToRetry.length === 0) {
+ return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap);
+ }
- await Promise.all(devicePromises);
+ // 4. Create sessions with users who have at least one device
+ // without a session.
+ await createMissingSession(userDevicesWithoutSession, peerOlmSessionsCreator);
- const sentMessagesSet = new Set(Object.keys(sentMessagesMap));
- const failedMessageIDs =
- messageIDs?.filter(id => !sentMessagesSet.has(id)) ?? [];
- if (failedMessageIDs.length > 0) {
- return {
- result: 'failure',
- failedMessageIDs,
- };
+ // 5. Retry messages for which the session was missing.
+ const retryPromises = messagesToRetry.map(message =>
+ handleOutboundP2PMessage(message, authMetadata, sendMessage),
+ );
+ const retryResults: Array<HandleOutboundP2PMessageResult> =
+ await Promise.all(retryPromises);
+ for (const result of retryResults) {
+ if (result.status === 'success') {
+ sentMessagesMap[result.messageID] = true;
+ }
}
- return {
- result: 'success',
- };
+
+ return processOutboundP2PMessagesResult(messageIDs, sentMessagesMap);
}
const AUTOMATIC_RETRY_FREQUENCY = 30 * 1000;
@@ -347,61 +334,56 @@
const { olmAPI } = getConfig();
await olmAPI.initializeCryptoAccount();
- // We want it distinct by device ID to avoid potentially creating
- // multiple Olm sessions with the same device simultaneously.
- const recipientsDistinctByDeviceID = [
- ...new Map(recipients.map(item => [item.deviceID, item])).values(),
- ];
- const senderInfo = { deviceID: thisDeviceID, userID: thisUserID };
- const promises = recipientsDistinctByDeviceID.map(async recipient => {
- try {
- const encryptedData = await olmAPI.encrypt(
- contentPayload,
- recipient.deviceID,
- );
- const encryptedMessage: EncryptedMessage = {
- type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE,
- senderInfo,
- encryptedData,
- };
- await sendMessageToDevice({
- deviceID: recipient.deviceID,
- payload: JSON.stringify(encryptedMessage),
- });
- } catch (e) {
- if (!e.message?.includes(olmSessionErrors.sessionDoesNotExist)) {
- console.log(
- `Error sending messages to peer ${recipient.deviceID}`,
- e,
- );
- return;
- }
- try {
- await peerOlmSessionsCreator(recipient.userID, [
- { deviceID: recipient.deviceID },
- ]);
- const encryptedData = await olmAPI.encrypt(
- contentPayload,
- recipient.deviceID,
- );
- const encryptedMessage: EncryptedMessage = {
- type: peerToPeerMessageTypes.ENCRYPTED_MESSAGE,
- senderInfo,
- encryptedData,
- };
- await sendMessageToDevice({
- deviceID: recipient.deviceID,
- payload: JSON.stringify(encryptedMessage),
- });
- } catch (err) {
- console.warn(
- `Error sending Olm-encrypted message to device ${recipient.deviceID}:`,
- err,
- );
+ // 1. Optimistically attempt to send all messages, and all should
+ // succeed, the only exceptions are messages for devices we don't
+ // have a session with or some other issues like network connection.
+ const recipientPromises = recipients.map(async recipient =>
+ ephemeralEncryptAndSendMessageToPeer(
+ contentPayload,
+ recipient,
+ authMetadata,
+ sendMessageToDevice,
+ ),
+ );
+ const messagesResults = await Promise.all(recipientPromises);
+
+ // 2. Analyze results to retrieve all devices that need session creation
+ // and map by userID.
+ const userDevicesWithoutSession: { [userID: string]: Array<string> } = {};
+ const recipientsToRetry: Array<{ +userID: string, +deviceID: string }> =
+ [];
+ for (const result of messagesResults) {
+ if (result.status === 'missing_session') {
+ recipientsToRetry.push(result.recipient);
+ const { userID, deviceID } = result.recipient;
+ if (userDevicesWithoutSession[userID]) {
+ userDevicesWithoutSession[userID].push(deviceID);
+ } else {
+ userDevicesWithoutSession[userID] = [deviceID];
}
}
- });
- await Promise.all(promises);
+ }
+ if (recipientsToRetry.length === 0) {
+ return;
+ }
+
+ // 3.Create a session with users which has at
+ // least one device without a session.
+ await createMissingSession(
+ userDevicesWithoutSession,
+ peerOlmSessionsCreator,
+ );
+
+ // 4. Retry recipients for which session was missing.
+ const retryPromises = recipientsToRetry.map(async recipient =>
+ ephemeralEncryptAndSendMessageToPeer(
+ contentPayload,
+ recipient,
+ authMetadata,
+ sendMessageToDevice,
+ ),
+ );
+ await Promise.all(retryPromises);
},
[peerOlmSessionsCreator, sendMessageToDevice],
);

File Metadata

Mime Type
text/plain
Expires
Wed, Nov 20, 8:34 PM (21 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2548956
Default Alt Text
D13376.id.diff (12 KB)

Event Timeline