Page MenuHomePhabricator

D13078.diff
No OneTemporary

D13078.diff

diff --git a/lib/tunnelbroker/peer-to-peer-message-handler.js b/lib/tunnelbroker/peer-to-peer-message-handler.js
--- a/lib/tunnelbroker/peer-to-peer-message-handler.js
+++ b/lib/tunnelbroker/peer-to-peer-message-handler.js
@@ -26,7 +26,14 @@
const { addListener, removeListener } = useTunnelbroker();
const peerToPeerMessageHandler = usePeerToPeerMessageHandler();
- const currentlyProcessedMessage = React.useRef<?Promise<mixed>>(null);
+ const [messagesQueue, setMessagesQueue] = React.useState<
+ $ReadOnlyArray<{
+ +peerToPeerMessage: PeerToPeerMessage,
+ +messageID: string,
+ +localSocketSessionCounter: number,
+ }>,
+ >([]);
+ const [isProcessing, setProcessing] = React.useState(false);
const tunnelbrokerMessageListener = React.useCallback(
async (message: TunnelbrokerToDeviceMessage) => {
@@ -60,38 +67,74 @@
return;
}
const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage;
- currentlyProcessedMessage.current = (async () => {
- const localSocketSessionCounter = getSessionCounter();
- await currentlyProcessedMessage.current;
- // Since scheduling processing this message socket is closed
- // or was closed and reopened, we have to stop processing
- // because Tunnelbroker flushes the message again when opening
- // the socket, and we want to process this only once
- // to maintain order.
- if (
- localSocketSessionCounter !== getSessionCounter() ||
- !doesSocketExist()
- ) {
- return;
- }
- try {
- await peerToPeerMessageHandler(peerToPeerMessage, message.messageID);
- } catch (e) {
- console.log(e.message);
- } finally {
- if (
- localSocketSessionCounter === getSessionCounter() &&
- doesSocketExist()
- ) {
- // We confirm regardless of success or error while processing.
- socketSend(JSON.stringify(confirmation));
- }
- }
- })();
+ setMessagesQueue(oldQueue => [
+ ...oldQueue,
+ {
+ peerToPeerMessage,
+ messageID: message.messageID,
+ localSocketSessionCounter: getSessionCounter(),
+ },
+ ]);
},
- [getSessionCounter, peerToPeerMessageHandler, doesSocketExist, socketSend],
+ [getSessionCounter, socketSend],
);
+ const processMessage = React.useCallback(async () => {
+ if (messagesQueue.length === 0 || isProcessing) {
+ return;
+ }
+
+ setProcessing(true);
+
+ const { peerToPeerMessage, messageID, localSocketSessionCounter } =
+ messagesQueue[0];
+
+ // Since scheduling processing this message socket is closed
+ // or was closed and reopened, we have to stop processing
+ // because Tunnelbroker flushes the message again when opening
+ // the socket, and we want to process this only once.
+ if (
+ localSocketSessionCounter !== getSessionCounter() ||
+ !doesSocketExist()
+ ) {
+ setMessagesQueue(currentQueue => currentQueue.slice(1));
+ setProcessing(false);
+ return;
+ }
+
+ try {
+ await peerToPeerMessageHandler(peerToPeerMessage, messageID);
+ } catch (e) {
+ console.log(e.message);
+ } finally {
+ if (
+ localSocketSessionCounter === getSessionCounter() &&
+ doesSocketExist()
+ ) {
+ const confirmation: MessageReceiveConfirmation = {
+ type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
+ messageIDs: [messageID],
+ };
+ socketSend(JSON.stringify(confirmation));
+ }
+ setMessagesQueue(currentQueue => currentQueue.slice(1));
+ setProcessing(false);
+ }
+ }, [
+ doesSocketExist,
+ getSessionCounter,
+ isProcessing,
+ messagesQueue,
+ peerToPeerMessageHandler,
+ socketSend,
+ ]);
+
+ React.useEffect(() => {
+ if (!isProcessing && messagesQueue.length > 0) {
+ void processMessage();
+ }
+ }, [messagesQueue, isProcessing, processMessage]);
+
React.useEffect(() => {
addListener(tunnelbrokerMessageListener);
return () => {

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 30, 5:01 PM (20 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2601410
Default Alt Text
D13078.diff (4 KB)

Event Timeline