diff --git a/lib/shared/farcaster/farcaster-hooks.js b/lib/shared/farcaster/farcaster-hooks.js --- a/lib/shared/farcaster/farcaster-hooks.js +++ b/lib/shared/farcaster/farcaster-hooks.js @@ -79,21 +79,28 @@ const FARCASTER_DATA_BATCH_SIZE = 20; const MAX_RETRIES = 3; const RETRY_DELAY_MS = 1000; -const MAX_BATCH_MESSAGE_COUNT = 1000; +const MAX_BATCH_MESSAGE_SIZE = 500000; class BatchedUpdates { userIDs: Set; updateInfos: Array; messageInfos: Array; additionalMessageInfos: Array; - messageCount: number; + estimatedSize: number; constructor() { this.userIDs = new Set(); this.updateInfos = ([]: Array); this.messageInfos = ([]: Array); this.additionalMessageInfos = ([]: Array); - this.messageCount = 0; + this.estimatedSize = 0; + } + + estimateMessageSize(messageInfo: RawMessageInfo): number { + if (messageInfo.type === messageTypes.TEXT && messageInfo.text) { + return messageInfo.text.length; + } + return 0; } addUserID(userID: string): void { @@ -110,28 +117,34 @@ updateInfo.type === updateTypes.JOIN_THREAD && updateInfo.rawMessageInfos ) { - this.messageCount += updateInfo.rawMessageInfos.length; + for (const messageInfo of updateInfo.rawMessageInfos) { + this.estimatedSize += this.estimateMessageSize(messageInfo); + } } } addMessageInfo(messageInfo: RawMessageInfo): void { this.messageInfos.push(messageInfo); - this.messageCount += 1; + this.estimatedSize += this.estimateMessageSize(messageInfo); } addMessageInfos(messageInfos: Array): void { this.messageInfos.push(...messageInfos); - this.messageCount += messageInfos.length; + for (const messageInfo of messageInfos) { + this.estimatedSize += this.estimateMessageSize(messageInfo); + } } addAdditionalMessageInfo(messageInfo: RawMessageInfo): void { this.additionalMessageInfos.push(messageInfo); - this.messageCount += 1; + this.estimatedSize += this.estimateMessageSize(messageInfo); } addAdditionalMessageInfos(messageInfos: Array): void { this.additionalMessageInfos.push(...messageInfos); - this.messageCount += messageInfos.length; + for (const messageInfo of messageInfos) { + this.estimatedSize += this.estimateMessageSize(messageInfo); + } } isEmpty(): boolean { @@ -143,8 +156,49 @@ ); } - getMessageCount(): number { - return this.messageCount; + getEstimatedSize(): number { + return this.estimatedSize; + } + + trimToSize(maxSize: number): BatchedUpdates { + const overflow = new BatchedUpdates(); + + if (this.estimatedSize <= maxSize) { + return overflow; + } + + const overflowAdditionalMessages = []; + while ( + this.additionalMessageInfos.length > 0 && + this.estimatedSize > maxSize + ) { + const message = this.additionalMessageInfos.pop(); + if (message) { + const messageSize = this.estimateMessageSize(message); + this.estimatedSize -= messageSize; + overflowAdditionalMessages.push(message); + } + } + + const overflowMessages = []; + // The size of a message is bounded, so it shouldn't matter in practice, + // but just in case, we should keep at least one message in a batch + while (this.messageInfos.length > 1 && this.estimatedSize > maxSize) { + const message = this.messageInfos.pop(); + if (message) { + const messageSize = this.estimateMessageSize(message); + this.estimatedSize -= messageSize; + overflowMessages.push(message); + } + } + + // We're popping from the end of the arrays, so we need to reverse them + overflowAdditionalMessages.reverse(); + overflowMessages.reverse(); + overflow.addAdditionalMessageInfos(overflowAdditionalMessages); + overflow.addMessageInfos(overflowMessages); + + return overflow; } merge(other: BatchedUpdates): void { @@ -152,7 +206,7 @@ this.updateInfos.push(...other.updateInfos); this.messageInfos.push(...other.messageInfos); this.additionalMessageInfos.push(...other.additionalMessageInfos); - this.messageCount += other.messageCount; + this.estimatedSize += other.estimatedSize; } getReduxPayload(): ProcessFarcasterOpsPayload { @@ -249,14 +303,15 @@ if (itemResult) { results.push(itemResult.result); batchedUpdates.merge(itemResult.updates); - const messagesCount = batchedUpdates.getMessageCount(); - if (messagesCount > MAX_BATCH_MESSAGE_COUNT) { + while (batchedUpdates.getEstimatedSize() > MAX_BATCH_MESSAGE_SIZE) { + const overflow = batchedUpdates.trimToSize(MAX_BATCH_MESSAGE_SIZE); + dispatch({ type: processFarcasterOpsActionType, payload: batchedUpdates.getReduxPayload(), }); - batchedUpdates = new BatchedUpdates(); + batchedUpdates = overflow; } } else { nullResultsInBatch++;