diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index dfa2dc99e..b3d02e9f5 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -204,24 +204,20 @@ export default class MessageReceiver this.incomingQueue = new PQueue({ concurrency: 1, - timeout: 1000 * 60 * 2, throwOnTimeout: true, }); this.appQueue = new PQueue({ concurrency: 1, - timeout: 1000 * 60 * 2, throwOnTimeout: true, }); // All envelopes start in encryptedQueue and progress to decryptedQueue this.encryptedQueue = new PQueue({ concurrency: 1, - timeout: 1000 * 60 * 2, throwOnTimeout: true, }); this.decryptedQueue = new PQueue({ concurrency: 1, - timeout: 1000 * 60 * 2, throwOnTimeout: true, }); @@ -255,9 +251,11 @@ export default class MessageReceiver request.respond(200, 'OK'); if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') { - this.incomingQueue.add(() => { - this.onEmpty(); - }); + this.incomingQueue.add( + createTaskWithTimeout(async () => { + this.onEmpty(); + }, 'incomingQueue/onEmpty') + ); } return; } @@ -326,12 +324,19 @@ export default class MessageReceiver } }; - this.incomingQueue.add(job); + this.incomingQueue.add( + createTaskWithTimeout(job, 'incomingQueue/websocket') + ); } public reset(): void { // We always process our cache before processing a new websocket message - this.incomingQueue.add(async () => this.queueAllCached()); + this.incomingQueue.add( + createTaskWithTimeout( + async () => this.queueAllCached(), + 'incomingQueue/queueAllCached' + ) + ); this.count = 0; this.isEmptied = false; @@ -348,14 +353,24 @@ export default class MessageReceiver public async drain(): Promise { const waitForEncryptedQueue = async () => - this.addToQueue(async () => { - log.info('drained'); - }, TaskType.Decrypted); + this.addToQueue( + async () => { + log.info('drained'); + }, + 'drain/waitForDecrypted', + TaskType.Decrypted + ); const waitForIncomingQueue = async () => - this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted); + this.addToQueue( + waitForEncryptedQueue, + 'drain/waitForEncrypted', + TaskType.Encrypted + ); - return this.incomingQueue.add(waitForIncomingQueue); + return this.incomingQueue.add( + createTaskWithTimeout(waitForIncomingQueue, 'drain/waitForIncoming') + ); } // @@ -508,7 +523,12 @@ export default class MessageReceiver // private async dispatchAndWait(event: Event): Promise { - this.appQueue.add(async () => Promise.all(this.dispatchEvent(event))); + this.appQueue.add( + createTaskWithTimeout( + async () => Promise.all(this.dispatchEvent(event)), + 'dispatchEvent' + ) + ); } private calculateMessageAge( @@ -542,6 +562,7 @@ export default class MessageReceiver private async addToQueue( task: () => Promise, + id: string, taskType: TaskType ): Promise { if (taskType === TaskType.Encrypted) { @@ -554,7 +575,7 @@ export default class MessageReceiver : this.decryptedQueue; try { - return await queue.add(task); + return await queue.add(createTaskWithTimeout(task, id)); } finally { this.updateProgress(this.count); } @@ -580,24 +601,34 @@ export default class MessageReceiver ); // We don't await here because we don't want this to gate future message processing - this.appQueue.add(emitEmpty); + this.appQueue.add(createTaskWithTimeout(emitEmpty, 'emitEmpty')); }; const waitForEncryptedQueue = async () => { - this.addToQueue(waitForDecryptedQueue, TaskType.Decrypted); + this.addToQueue( + waitForDecryptedQueue, + 'onEmpty/waitForDecrypted', + TaskType.Decrypted + ); }; - const waitForIncomingQueue = () => { - this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted); - + const waitForIncomingQueue = async () => { // Note: this.count is used in addToQueue // Resetting count so everything from the websocket after this starts at zero this.count = 0; + + this.addToQueue( + waitForEncryptedQueue, + 'onEmpty/waitForEncrypted', + TaskType.Encrypted + ); }; const waitForCacheAddBatcher = async () => { await this.decryptAndCacheBatcher.onIdle(); - this.incomingQueue.add(waitForIncomingQueue); + this.incomingQueue.add( + createTaskWithTimeout(waitForIncomingQueue, 'onEmpty/waitForIncoming') + ); }; waitForCacheAddBatcher(); @@ -675,9 +706,13 @@ export default class MessageReceiver } // Maintain invariant: encrypted queue => decrypted queue - this.addToQueue(async () => { - this.queueDecryptedEnvelope(envelope, payloadPlaintext); - }, TaskType.Encrypted); + this.addToQueue( + async () => { + this.queueDecryptedEnvelope(envelope, payloadPlaintext); + }, + 'queueDecryptedEnvelope', + TaskType.Encrypted + ); } else { this.queueCachedEnvelope(item, envelope); } @@ -729,7 +764,12 @@ export default class MessageReceiver if (this.isEmptied) { this.clearRetryTimeout(); this.retryCachedTimeout = setTimeout(() => { - this.incomingQueue.add(async () => this.queueAllCached()); + this.incomingQueue.add( + createTaskWithTimeout( + async () => this.queueAllCached(), + 'queueAllCached' + ) + ); }, RETRY_TIMEOUT); } } @@ -967,7 +1007,11 @@ export default class MessageReceiver ); try { - await this.addToQueue(taskWithTimeout, TaskType.Decrypted); + await this.addToQueue( + taskWithTimeout, + 'dispatchEvent', + TaskType.Decrypted + ); } catch (error) { log.error( `queueDecryptedEnvelope error handling envelope ${id}:`, @@ -984,7 +1028,7 @@ export default class MessageReceiver let logId = this.getEnvelopeId(envelope); log.info(`queueing ${uuidKind} envelope`, logId); - const task = createTaskWithTimeout(async (): Promise => { + const task = async (): Promise => { const unsealedEnvelope = await this.unsealEnvelope( stores, envelope, @@ -1000,14 +1044,19 @@ export default class MessageReceiver this.addToQueue( async () => this.dispatchEvent(new EnvelopeEvent(unsealedEnvelope)), + 'dispatchEvent', TaskType.Decrypted ); return this.decryptEnvelope(stores, unsealedEnvelope, uuidKind); - }, `MessageReceiver: unseal and decrypt ${logId}`); + }; try { - return await this.addToQueue(task, TaskType.Encrypted); + return await this.addToQueue( + task, + `MessageReceiver: unseal and decrypt ${logId}`, + TaskType.Encrypted + ); } catch (error) { const args = [ 'queueEncryptedEnvelope error handling envelope', @@ -1630,6 +1679,7 @@ export default class MessageReceiver // Avoid deadlocks by scheduling processing on decrypted queue this.addToQueue( async () => this.dispatchEvent(event), + 'decrypted/dispatchEvent', TaskType.Decrypted ); } else { diff --git a/ts/textsecure/TaskWithTimeout.ts b/ts/textsecure/TaskWithTimeout.ts index 23806a409..3bb1c2cf4 100644 --- a/ts/textsecure/TaskWithTimeout.ts +++ b/ts/textsecure/TaskWithTimeout.ts @@ -12,9 +12,11 @@ type TaskType = { }; const tasks = new Set(); +let shouldStartTimers = true; export function suspendTasksWithTimeout(): void { log.info(`TaskWithTimeout: suspending ${tasks.size} tasks`); + shouldStartTimers = false; for (const task of tasks) { task.suspend(); } @@ -22,6 +24,7 @@ export function suspendTasksWithTimeout(): void { export function resumeTasksWithTimeout(): void { log.info(`TaskWithTimeout: resuming ${tasks.size} tasks`); + shouldStartTimers = true; for (const task of tasks) { task.resume(); } @@ -75,7 +78,9 @@ export default function createTaskWithTimeout>( }; tasks.add(entry); - startTimer(); + if (shouldStartTimers) { + startTimer(); + } let result: unknown;