diff --git a/lib/tunnelbroker/peer-to-peer-context.js b/lib/tunnelbroker/peer-to-peer-context.js --- a/lib/tunnelbroker/peer-to-peer-context.js +++ b/lib/tunnelbroker/peer-to-peer-context.js @@ -128,53 +128,61 @@ } }; + const devicePromises = []; for (const peerDeviceID in devicesMap) { - for (const message of devicesMap[peerDeviceID]) { - if (message.status === outboundP2PMessageStatuses.persisted) { - try { - const result = await olmAPI.encryptAndPersist( - message.plaintext, - message.deviceID, - message.messageID, - ); - - const encryptedMessage: OutboundP2PMessage = { - ...message, - ciphertext: JSON.stringify(result), - }; - await sendMessageToPeer(encryptedMessage); - } catch (e) { - if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { - console.log(`Error sending messages to peer ${peerDeviceID}`, e); - break; - } + const devicePromise = (async () => { + for (const message of devicesMap[peerDeviceID]) { + if (message.status === outboundP2PMessageStatuses.persisted) { try { - await peerOlmSessionsCreator(message.userID, peerDeviceID); const result = await olmAPI.encryptAndPersist( message.plaintext, message.deviceID, message.messageID, ); + const encryptedMessage: OutboundP2PMessage = { ...message, ciphertext: JSON.stringify(result), }; - await sendMessageToPeer(encryptedMessage); - } catch (err) { - console.log(`Error sending messages to peer ${peerDeviceID}`, err); - break; + } catch (e) { + if (!e.message?.includes(olmSessionErrors.sessionDoesNotExists)) { + console.log(`Error sending messages to peer ${peerDeviceID}`, e); + break; + } + try { + await peerOlmSessionsCreator(message.userID, peerDeviceID); + const result = await olmAPI.encryptAndPersist( + message.plaintext, + message.deviceID, + message.messageID, + ); + const encryptedMessage: OutboundP2PMessage = { + ...message, + ciphertext: JSON.stringify(result), + }; + + await sendMessageToPeer(encryptedMessage); + } catch (err) { + console.log( + `Error sending messages to peer ${peerDeviceID}`, + err, + ); + break; + } } + } else if (message.status === outboundP2PMessageStatuses.encrypted) { + await sendMessageToPeer(message); + } else if (message.status === outboundP2PMessageStatuses.sent) { + // Handle edge-case when message was sent, but it wasn't updated + // in the message store. + sentMessagesMap[message.messageID] = true; } - } else if (message.status === outboundP2PMessageStatuses.encrypted) { - await sendMessageToPeer(message); - } else if (message.status === outboundP2PMessageStatuses.sent) { - // Handle edge-case when message was sent, but it wasn't updated - // in the message store. - sentMessagesMap[message.messageID] = true; } - } + })(); + devicePromises.push(devicePromise); } + await Promise.all(devicePromises); return Object.keys(sentMessagesMap); }