From 1f65a4491f01e663b95e70481278830237e93279 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Wed, 20 Jul 2022 19:40:53 -0700 Subject: [PATCH] Use concurrency of one for queues with no options --- ts/background.ts | 7 +++--- ts/jobs/JobQueue.ts | 2 +- ts/test-node/jobs/JobQueue_test.ts | 36 +++++++++++++++++------------- ts/util/handleRetry.ts | 4 ++-- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/ts/background.ts b/ts/background.ts index ee58cf891..e5f4ec272 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -441,17 +441,18 @@ export async function startApp(): Promise { timeout: durations.MINUTE * 30, }); + // Note: this queue is meant to allow for stop/start of tasks, not limit parallelism. const profileKeyResponseQueue = new window.PQueue(); profileKeyResponseQueue.pause(); - const lightSessionResetQueue = new window.PQueue(); + const lightSessionResetQueue = new window.PQueue({ concurrency: 1 }); window.Signal.Services.lightSessionResetQueue = lightSessionResetQueue; lightSessionResetQueue.pause(); - const onDecryptionErrorQueue = new window.PQueue(); + const onDecryptionErrorQueue = new window.PQueue({ concurrency: 1 }); onDecryptionErrorQueue.pause(); - const onRetryRequestQueue = new window.PQueue(); + const onRetryRequestQueue = new window.PQueue({ concurrency: 1 }); onRetryRequestQueue.pause(); window.Whisper.deliveryReceiptQueue = new window.PQueue({ diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index 68cb79464..a908077f8 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -62,7 +62,7 @@ export abstract class JobQueue { } >(); - private readonly defaultInMemoryQueue = new PQueue(); + private readonly defaultInMemoryQueue = new PQueue({ concurrency: 1 }); private started = false; diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index 20cebfdef..ab0ba4ce2 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -17,6 +17,7 @@ import type { LoggerType } from '../../types/Logging'; import { JobQueue } from '../../jobs/JobQueue'; import type { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types'; +import { sleep } from '../../util'; describe('JobQueue', () => { describe('end-to-end tests', () => { @@ -68,12 +69,12 @@ describe('JobQueue', () => { assert.isEmpty(store.storedJobs); }); - it('by default, kicks off multiple jobs in parallel', async () => { + it('by default, kicks off one job at a time', async () => { + let maxActive = 0; let activeJobCount = 0; - const eventBus = new EventEmitter(); const updateActiveJobCount = (incrementBy: number): void => { activeJobCount += incrementBy; - eventBus.emit('updated'); + maxActive = Math.max(activeJobCount, maxActive); }; class Queue extends JobQueue { @@ -84,14 +85,7 @@ describe('JobQueue', () => { async run(): Promise { try { updateActiveJobCount(1); - await new Promise(resolve => { - eventBus.on('updated', () => { - if (activeJobCount === 4) { - eventBus.emit('got to 4'); - resolve(); - } - }); - }); + sleep(1); } finally { updateActiveJobCount(-1); } @@ -107,12 +101,22 @@ describe('JobQueue', () => { }); queue.streamJobs(); - queue.add(1); - queue.add(2); - queue.add(3); - queue.add(4); + const createPromise1 = queue.add(1); + const createPromise2 = queue.add(2); + const createPromise3 = queue.add(3); + const createPromise4 = queue.add(4); - await once(eventBus, 'got to 4'); + const { completion: promise1 } = await createPromise1; + const { completion: promise2 } = await createPromise2; + const { completion: promise3 } = await createPromise3; + const { completion: promise4 } = await createPromise4; + + await promise1; + await promise2; + await promise3; + await promise4; + + assert.strictEqual(1, maxActive); }); it('can override the in-memory queue', async () => { diff --git a/ts/util/handleRetry.ts b/ts/util/handleRetry.ts index 817cdbc66..495c1fa88 100644 --- a/ts/util/handleRetry.ts +++ b/ts/util/handleRetry.ts @@ -599,10 +599,10 @@ function scheduleSessionReset(senderUuid: string, senderDevice: number) { ); } - lightSessionResetQueue.add(() => { + lightSessionResetQueue.add(async () => { const ourUuid = window.textsecure.storage.user.getCheckedUuid(); - window.textsecure.storage.protocol.lightSessionReset( + await window.textsecure.storage.protocol.lightSessionReset( new QualifiedAddress(ourUuid, Address.create(senderUuid, senderDevice)) ); });