Use smaller batches for receipts and syncs

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2021-07-29 18:42:41 -07:00 committed by GitHub
parent 49c7a70a75
commit 165e385a84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 21 deletions

View File

@ -320,7 +320,7 @@ export async function startApp(): Promise<void> {
{
name: 'Whisper.deliveryReceiptBatcher',
wait: 500,
maxSize: 500,
maxSize: 100,
processBatch: async items => {
const byConversationId = window._.groupBy(items, item =>
window.ConversationController.ensureContactIds({

View File

@ -5,6 +5,7 @@
import * as z from 'zod';
import * as moment from 'moment';
import { chunk } from 'lodash';
import { getSendOptions } from '../util/getSendOptions';
import { handleMessageSend } from '../util/handleMessageSend';
import { isNotNil } from '../util/isNotNil';
@ -21,6 +22,8 @@ import { parseIntWithFallback } from '../util/parseIntWithFallback';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
const CHUNK_SIZE = 100;
const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds();
const readSyncJobDataSchema = z.object({
@ -79,17 +82,21 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
await sleep(exponentialBackoffSleepTime(attempt));
const messageIds = readSyncs.map(item => item.messageId).filter(isNotNil);
const ourConversation = window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
try {
await handleMessageSend(
window.textsecure.messaging.syncReadMessages(readSyncs, sendOptions),
{ messageIds, sendType: 'readSync' }
await Promise.all(
chunk(readSyncs, CHUNK_SIZE).map(batch => {
const messageIds = batch.map(item => item.messageId).filter(isNotNil);
return handleMessageSend(
window.textsecure.messaging.syncReadMessages(batch, sendOptions),
{ messageIds, sendType: 'readSync' }
);
})
);
} catch (err: unknown) {
if (!(err instanceof Error)) {

View File

@ -36,6 +36,8 @@ import { SignalService as Proto } from '../protobuf';
const THIRTY_SECONDS = 30 * 1000;
const MAX_MESSAGE_SIZE = 64 * 1024;
export class IncomingWebSocketRequest {
private readonly id: Long | number;
@ -207,6 +209,10 @@ export default class WebSocketResource extends EventTarget {
});
});
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket request byte size exceeded'
);
this.socket.sendBytes(Buffer.from(bytes));
return promise;
@ -291,6 +297,10 @@ export default class WebSocketResource extends EventTarget {
(bytes: Buffer): void => {
this.removeActive(incomingRequest);
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket response byte size exceeded'
);
this.socket.sendBytes(bytes);
}
);

View File

@ -1,7 +1,7 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { groupBy, map } from 'lodash';
import { chunk, groupBy, map } from 'lodash';
import { ConversationAttributesType } from '../model-types.d';
import { getSendOptions } from './getSendOptions';
import { handleMessageSend } from './handleMessageSend';
@ -16,6 +16,8 @@ type ReceiptSpecType = {
hasErrors: boolean;
};
const CHUNK_SIZE = 100;
export async function sendReadReceiptsFor(
conversationAttrs: ConversationAttributesType,
items: Array<ReceiptSpecType>
@ -31,23 +33,31 @@ export async function sendReadReceiptsFor(
await Promise.all(
map(receiptsBySender, async (receipts, senderId) => {
const timestamps = map(receipts, item => item.timestamp);
const messageIds = map(receipts, item => item.messageId);
const conversation = window.ConversationController.get(senderId);
if (conversation) {
await handleMessageSend(
window.textsecure.messaging.sendReadReceipts({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
senderE164: conversation.get('e164')!,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
senderUuid: conversation.get('uuid')!,
timestamps,
options: sendOptions,
}),
{ messageIds, sendType: 'readReceipt' }
);
if (!conversation) {
return;
}
const batches = chunk(receipts, CHUNK_SIZE);
await Promise.all(
batches.map(batch => {
const timestamps = map(batch, item => item.timestamp);
const messageIds = map(batch, item => item.messageId);
return handleMessageSend(
window.textsecure.messaging.sendReadReceipts({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
senderE164: conversation.get('e164')!,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
senderUuid: conversation.get('uuid')!,
timestamps,
options: sendOptions,
}),
{ messageIds, sendType: 'readReceipt' }
);
})
);
})
);
}