// Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { once, noop } from 'lodash'; /** * You can do two things with an async queue: * * 1. Put values in. * 2. Consume values out in the order they were added. * * Values are removed from the queue when they're consumed. * * There can only be one consumer, though this could be changed. * * See the tests to see how this works. */ export class AsyncQueue implements AsyncIterable { private onAdd: () => void = noop; private queue: Array = []; private isReading = false; add(value: Readonly): void { this.queue.push(value); this.onAdd(); } async *[Symbol.asyncIterator](): AsyncIterator { if (this.isReading) { throw new Error('Cannot iterate over a queue more than once'); } this.isReading = true; while (true) { yield* this.queue; this.queue = []; // We want to iterate over the queue in series. // eslint-disable-next-line no-await-in-loop await new Promise(resolve => { this.onAdd = once(resolve); }); } } }