Use `throwOnTimeout` option for PQueue

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2021-11-24 06:38:24 -08:00 committed by GitHub
parent ccf0100906
commit 2d6a398120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 54 additions and 10 deletions

View File

@ -781,7 +781,11 @@ export class ConversationController {
`ConversationController: Removing ${temporaryConversations.length} temporary conversations` `ConversationController: Removing ${temporaryConversations.length} temporary conversations`
); );
} }
const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 }); const queue = new PQueue({
concurrency: 3,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
queue.addAll( queue.addAll(
temporaryConversations.map(item => async () => { temporaryConversations.map(item => async () => {
await removeConversation(item.id, { await removeConversation(item.id, {

View File

@ -516,7 +516,11 @@ export class SignalProtocolStore extends EventsMixin {
} }
private _createSenderKeyQueue(): PQueue { private _createSenderKeyQueue(): PQueue {
return new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); return new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
} }
private _getSenderKeyQueue(senderId: QualifiedAddress): PQueue { private _getSenderKeyQueue(senderId: QualifiedAddress): PQueue {
@ -663,7 +667,11 @@ export class SignalProtocolStore extends EventsMixin {
} }
private _createSessionQueue(): PQueue { private _createSessionQueue(): PQueue {
return new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); return new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
} }
private _getSessionQueue(id: QualifiedAddress): PQueue { private _getSessionQueue(id: QualifiedAddress): PQueue {

View File

@ -100,7 +100,11 @@ const makeImagePath = (src: string) => {
return `${ROOT_PATH}node_modules/emoji-datasource-apple/img/apple/64/${src}`; return `${ROOT_PATH}node_modules/emoji-datasource-apple/img/apple/64/${src}`;
}; };
const imageQueue = new PQueue({ concurrency: 10, timeout: 1000 * 60 * 2 }); const imageQueue = new PQueue({
concurrency: 10,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
const images = new Set(); const images = new Set();
export const preloadImages = async (): Promise<void> => { export const preloadImages = async (): Promise<void> => {

View File

@ -75,7 +75,11 @@ export async function routineProfileRefresh({
} }
} }
const refreshQueue = new PQueue({ concurrency: 5, timeout: 1000 * 60 * 2 }); const refreshQueue = new PQueue({
concurrency: 5,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
for (const conversation of conversationsToRefresh) { for (const conversation of conversationsToRefresh) {
refreshQueue.add(() => refreshConversation(conversation)); refreshQueue.add(() => refreshConversation(conversation));
} }

View File

@ -200,17 +200,27 @@ export default class MessageReceiver
} }
this.serverTrustRoot = Bytes.fromBase64(serverTrustRoot); this.serverTrustRoot = Bytes.fromBase64(serverTrustRoot);
this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); this.incomingQueue = new PQueue({
this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
this.appQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
// All envelopes start in encryptedQueue and progress to decryptedQueue // All envelopes start in encryptedQueue and progress to decryptedQueue
this.encryptedQueue = new PQueue({ this.encryptedQueue = new PQueue({
concurrency: 1, concurrency: 1,
timeout: 1000 * 60 * 2, timeout: 1000 * 60 * 2,
throwOnTimeout: true,
}); });
this.decryptedQueue = new PQueue({ this.decryptedQueue = new PQueue({
concurrency: 1, concurrency: 1,
timeout: 1000 * 60 * 2, timeout: 1000 * 60 * 2,
throwOnTimeout: true,
}); });
this.decryptAndCacheBatcher = createBatcher<CacheAddItemType>({ this.decryptAndCacheBatcher = createBatcher<CacheAddItemType>({

View File

@ -2015,7 +2015,11 @@ export function initialize({
}); });
// Upload stickers // Upload stickers
const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 }); const queue = new PQueue({
concurrency: 3,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
await Promise.all( await Promise.all(
stickers.map(async (sticker: ServerAttachmentType, index: number) => { stickers.map(async (sticker: ServerAttachmentType, index: number) => {
const stickerParams = makePutParams( const stickerParams = makePutParams(

View File

@ -44,7 +44,11 @@ export function createBatcher<ItemType>(
let batcher: BatcherType<ItemType>; let batcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null; let timeout: NodeJS.Timeout | null;
let items: Array<ItemType> = []; let items: Array<ItemType> = [];
const queue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); const queue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
function _kickBatchOff() { function _kickBatchOff() {
if (timeout) { if (timeout) {

View File

@ -7,6 +7,7 @@ import { Sound } from './Sound';
const ringtoneEventQueue = new PQueue({ const ringtoneEventQueue = new PQueue({
concurrency: 1, concurrency: 1,
timeout: 1000 * 60 * 2, timeout: 1000 * 60 * 2,
throwOnTimeout: true,
}); });
class CallingTones { class CallingTones {

View File

@ -649,6 +649,7 @@ export async function _waitForAll<T>({
const queue = new PQueue({ const queue = new PQueue({
concurrency: maxConcurrency, concurrency: maxConcurrency,
timeout: 2 * 60 * 1000, timeout: 2 * 60 * 1000,
throwOnTimeout: true,
}); });
return queue.addAll(tasks); return queue.addAll(tasks);
} }

View File

@ -62,7 +62,11 @@ export function createWaitBatcher<ItemType>(
let waitBatcher: BatcherType<ItemType>; let waitBatcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null; let timeout: NodeJS.Timeout | null;
let items: Array<ItemHolderType<ItemType>> = []; let items: Array<ItemHolderType<ItemType>> = [];
const queue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); const queue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
async function _kickBatchOff() { async function _kickBatchOff() {
const itemsRef = items; const itemsRef = items;