Use concurrency of one for queues with no options

This commit is contained in:
Scott Nonnenberg 2022-07-20 19:40:53 -07:00 committed by GitHub
parent 4cd05dc6c9
commit 1f65a4491f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 22 deletions

View File

@ -441,17 +441,18 @@ export async function startApp(): Promise<void> {
timeout: durations.MINUTE * 30,
});
// Note: this queue is meant to allow for stop/start of tasks, not limit parallelism.
const profileKeyResponseQueue = new window.PQueue();
profileKeyResponseQueue.pause();
const lightSessionResetQueue = new window.PQueue();
const lightSessionResetQueue = new window.PQueue({ concurrency: 1 });
window.Signal.Services.lightSessionResetQueue = lightSessionResetQueue;
lightSessionResetQueue.pause();
const onDecryptionErrorQueue = new window.PQueue();
const onDecryptionErrorQueue = new window.PQueue({ concurrency: 1 });
onDecryptionErrorQueue.pause();
const onRetryRequestQueue = new window.PQueue();
const onRetryRequestQueue = new window.PQueue({ concurrency: 1 });
onRetryRequestQueue.pause();
window.Whisper.deliveryReceiptQueue = new window.PQueue({

View File

@ -62,7 +62,7 @@ export abstract class JobQueue<T> {
}
>();
private readonly defaultInMemoryQueue = new PQueue();
private readonly defaultInMemoryQueue = new PQueue({ concurrency: 1 });
private started = false;

View File

@ -17,6 +17,7 @@ import type { LoggerType } from '../../types/Logging';
import { JobQueue } from '../../jobs/JobQueue';
import type { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types';
import { sleep } from '../../util';
describe('JobQueue', () => {
describe('end-to-end tests', () => {
@ -68,12 +69,12 @@ describe('JobQueue', () => {
assert.isEmpty(store.storedJobs);
});
it('by default, kicks off multiple jobs in parallel', async () => {
it('by default, kicks off one job at a time', async () => {
let maxActive = 0;
let activeJobCount = 0;
const eventBus = new EventEmitter();
const updateActiveJobCount = (incrementBy: number): void => {
activeJobCount += incrementBy;
eventBus.emit('updated');
maxActive = Math.max(activeJobCount, maxActive);
};
class Queue extends JobQueue<number> {
@ -84,14 +85,7 @@ describe('JobQueue', () => {
async run(): Promise<void> {
try {
updateActiveJobCount(1);
await new Promise<void>(resolve => {
eventBus.on('updated', () => {
if (activeJobCount === 4) {
eventBus.emit('got to 4');
resolve();
}
});
});
sleep(1);
} finally {
updateActiveJobCount(-1);
}
@ -107,12 +101,22 @@ describe('JobQueue', () => {
});
queue.streamJobs();
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
const createPromise1 = queue.add(1);
const createPromise2 = queue.add(2);
const createPromise3 = queue.add(3);
const createPromise4 = queue.add(4);
await once(eventBus, 'got to 4');
const { completion: promise1 } = await createPromise1;
const { completion: promise2 } = await createPromise2;
const { completion: promise3 } = await createPromise3;
const { completion: promise4 } = await createPromise4;
await promise1;
await promise2;
await promise3;
await promise4;
assert.strictEqual(1, maxActive);
});
it('can override the in-memory queue', async () => {

View File

@ -599,10 +599,10 @@ function scheduleSessionReset(senderUuid: string, senderDevice: number) {
);
}
lightSessionResetQueue.add(() => {
lightSessionResetQueue.add(async () => {
const ourUuid = window.textsecure.storage.user.getCheckedUuid();
window.textsecure.storage.protocol.lightSessionReset(
await window.textsecure.storage.protocol.lightSessionReset(
new QualifiedAddress(ourUuid, Address.create(senderUuid, senderDevice))
);
});