Page MenuHomePhabricator

D13623.id45095.diff
No OneTemporary

D13623.id45095.diff

diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js
--- a/lib/tunnelbroker/peer-to-peer-message-handler.js
+++ b/lib/tunnelbroker/peer-to-peer-message-handler.js
@@ -8,6 +8,7 @@
usePeerToPeerMessageHandler,
} from './use-peer-to-peer-message-handler.js';
import { useLoggedInUserInfo } from '../hooks/account-hooks.js';
+import { useActionsQueue } from '../hooks/actions-queue.js';
import type { InboundP2PMessage } from '../types/sqlite-types.js';
import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js';
import {
@@ -33,16 +34,74 @@
const peerToPeerMessageHandler = usePeerToPeerMessageHandler();
const handleOlmMessageToDevice = useHandleOlmMessageToDevice();
- const [messagesQueue, setMessagesQueue] = React.useState<
- $ReadOnlyArray<{
- +peerToPeerMessage: PeerToPeerMessage,
- +messageID: string,
- +localSocketSessionCounter: number,
- }>,
- >([]);
- const [isProcessing, setProcessing] = React.useState(false);
- const [processedInboundMessages, setProcessedInboundMessages] =
- React.useState(false);
+ const processItem = React.useCallback(
+ async (
+ item:
+ | {
+ +type: 'persisted_message',
+ +message: InboundP2PMessage,
+ }
+ | {
+ +type: 'received_message',
+ +message: {
+ +peerToPeerMessage: PeerToPeerMessage,
+ +messageID: string,
+ +localSocketSessionCounter: number,
+ },
+ },
+ ) => {
+ if (item.type === 'persisted_message') {
+ const { message } = item;
+ try {
+ await handleOlmMessageToDevice(
+ message.plaintext,
+ { deviceID: message.senderDeviceID, userID: message.senderUserID },
+ message.messageID,
+ );
+ } catch (e) {
+ console.log('Failed processing Olm P2P message:', e);
+ }
+ } else {
+ const { peerToPeerMessage, messageID, localSocketSessionCounter } =
+ item.message;
+ // Since scheduling processing this message socket is closed
+ // or was closed and reopened, we have to stop processing
+ // because Tunnelbroker flushes the message again when opening
+ // the socket, and we want to process this only once.
+ if (
+ localSocketSessionCounter !== getSessionCounter() ||
+ !doesSocketExist()
+ ) {
+ return;
+ }
+
+ try {
+ await peerToPeerMessageHandler(peerToPeerMessage, messageID);
+ } catch (e) {
+ console.log(e.message);
+ } finally {
+ if (
+ localSocketSessionCounter === getSessionCounter() &&
+ doesSocketExist()
+ ) {
+ const confirmation: MessageReceiveConfirmation = {
+ type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
+ messageIDs: [messageID],
+ };
+ socketSend(JSON.stringify(confirmation));
+ }
+ }
+ }
+ },
+ [
+ doesSocketExist,
+ getSessionCounter,
+ handleOlmMessageToDevice,
+ peerToPeerMessageHandler,
+ socketSend,
+ ],
+ );
+ const { enqueue } = useActionsQueue(processItem);
const tunnelbrokerMessageListener = React.useCallback(
async (message: TunnelbrokerToDeviceMessage) => {
@@ -76,121 +135,53 @@
return;
}
const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage;
- setMessagesQueue(oldQueue => [
- ...oldQueue,
+ enqueue([
{
- peerToPeerMessage,
- messageID: message.messageID,
- localSocketSessionCounter: getSessionCounter(),
+ type: 'received_message',
+ message: {
+ peerToPeerMessage,
+ messageID: message.messageID,
+ localSocketSessionCounter: getSessionCounter(),
+ },
},
]);
},
- [getSessionCounter, socketSend],
+ [enqueue, getSessionCounter, socketSend],
);
- const processPersistedInboundMessages = React.useCallback(async () => {
- if (isProcessing || processedInboundMessages) {
- return;
- }
- setProcessing(true);
+ React.useEffect(() => {
+ addListener(tunnelbrokerMessageListener);
+ return () => {
+ removeListener(tunnelbrokerMessageListener);
+ };
+ }, [addListener, removeListener, tunnelbrokerMessageListener]);
+ const processPersistedInboundMessages = React.useCallback(async () => {
try {
const { sqliteAPI } = getConfig();
const messages = await sqliteAPI.getAllInboundP2PMessages();
-
- for (const message: InboundP2PMessage of messages) {
- try {
- await handleOlmMessageToDevice(
- message.plaintext,
- { deviceID: message.senderDeviceID, userID: message.senderUserID },
- message.messageID,
- );
- } catch (e) {
- console.log('Failed processing Olm P2P message:', e);
- }
- }
- } finally {
- setProcessedInboundMessages(true);
- setProcessing(false);
- }
- }, [handleOlmMessageToDevice, isProcessing, processedInboundMessages]);
-
- const processMessage = React.useCallback(async () => {
- if (messagesQueue.length === 0 || isProcessing) {
- return;
- }
-
- setProcessing(true);
-
- const { peerToPeerMessage, messageID, localSocketSessionCounter } =
- messagesQueue[0];
-
- // Since scheduling processing this message socket is closed
- // or was closed and reopened, we have to stop processing
- // because Tunnelbroker flushes the message again when opening
- // the socket, and we want to process this only once.
- if (
- localSocketSessionCounter !== getSessionCounter() ||
- !doesSocketExist()
- ) {
- setMessagesQueue(currentQueue => currentQueue.slice(1));
- setProcessing(false);
- return;
- }
-
- try {
- await peerToPeerMessageHandler(peerToPeerMessage, messageID);
+ enqueue(
+ messages.map(message => ({
+ type: 'persisted_message',
+ message,
+ })),
+ );
} catch (e) {
- console.log(e.message);
- } finally {
- if (
- localSocketSessionCounter === getSessionCounter() &&
- doesSocketExist()
- ) {
- const confirmation: MessageReceiveConfirmation = {
- type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
- messageIDs: [messageID],
- };
- socketSend(JSON.stringify(confirmation));
- }
- setMessagesQueue(currentQueue => currentQueue.slice(1));
- setProcessing(false);
+ console.log('error while reading persisted inbound messages:', e.message);
}
- }, [
- doesSocketExist,
- getSessionCounter,
- isProcessing,
- messagesQueue,
- peerToPeerMessageHandler,
- socketSend,
- ]);
+ }, [enqueue]);
const loggedInUserInfo = useLoggedInUserInfo();
const viewerID = loggedInUserInfo?.id;
+
+ const processingInputMessagesStarted = React.useRef(false);
React.useEffect(() => {
- if (isProcessing || !viewerID) {
+ if (!viewerID || processingInputMessagesStarted.current) {
return;
}
- if (!processedInboundMessages) {
- void processPersistedInboundMessages();
- } else if (messagesQueue.length > 0) {
- void processMessage();
- }
- }, [
- messagesQueue,
- isProcessing,
- processMessage,
- processedInboundMessages,
- processPersistedInboundMessages,
- viewerID,
- ]);
-
- React.useEffect(() => {
- addListener(tunnelbrokerMessageListener);
- return () => {
- removeListener(tunnelbrokerMessageListener);
- };
- }, [addListener, removeListener, tunnelbrokerMessageListener]);
+ processingInputMessagesStarted.current = true;
+ void processPersistedInboundMessages();
+ }, [processPersistedInboundMessages, viewerID]);
}
export { PeerToPeerMessageHandler };

File Metadata

Mime Type
text/plain
Expires
Fri, Oct 18, 2:05 PM (8 h, 33 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2314208
Default Alt Text
D13623.id45095.diff (7 KB)

Event Timeline