Ensure messages are sent in order, even with errors

This commit is contained in:
Evan Hahn 2021-09-07 15:39:14 -05:00 committed by GitHub
parent 634f4a8bb7
commit a3eed6191e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 348 additions and 244 deletions

View file

@ -1,6 +1,7 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue';
import { v4 as uuid } from 'uuid';
import { noop } from 'lodash';
@ -60,6 +61,8 @@ export abstract class JobQueue<T> {
}
>();
private readonly defaultInMemoryQueue = new PQueue();
private started = false;
constructor(options: Readonly<JobQueueOptions>) {
@ -172,6 +175,10 @@ export abstract class JobQueue<T> {
return new Job(id, timestamp, this.queueType, data, completion);
}
protected getInMemoryQueue(_parsedJob: ParsedJob<T>): PQueue {
return this.defaultInMemoryQueue;
}
private async enqueueStoredJob(storedJob: Readonly<StoredJob>) {
assert(
storedJob.queueType === this.queueType,
@ -205,38 +212,46 @@ export abstract class JobQueue<T> {
data: parsedData,
};
const queue: PQueue = this.getInMemoryQueue(parsedJob);
const logger = new JobLogger(parsedJob, this.logger);
let result:
const result:
| undefined
| { success: true }
| { success: false; err: unknown };
| { success: false; err: unknown } = await queue.add(async () => {
for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) {
const isFinalAttempt = attempt === this.maxAttempts;
for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) {
logger.attempt = attempt;
logger.attempt = attempt;
log.info(
`${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}`
);
try {
// We want an `await` in the loop, as we don't want a single job running more
// than once at a time. Ideally, the job will succeed on the first attempt.
// eslint-disable-next-line no-await-in-loop
await this.run(parsedJob, { attempt, log: logger });
result = { success: true };
log.info(
`${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}`
);
break;
} catch (err: unknown) {
result = { success: false, err };
log.error(
`${this.logPrefix} job ${
storedJob.id
} failed on attempt ${attempt}. ${Errors.toLogFormat(err)}`
`${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}`
);
try {
// We want an `await` in the loop, as we don't want a single job running more
// than once at a time. Ideally, the job will succeed on the first attempt.
// eslint-disable-next-line no-await-in-loop
await this.run(parsedJob, { attempt, log: logger });
log.info(
`${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}`
);
return { success: true };
} catch (err: unknown) {
log.error(
`${this.logPrefix} job ${
storedJob.id
} failed on attempt ${attempt}. ${Errors.toLogFormat(err)}`
);
if (isFinalAttempt) {
return { success: false, err };
}
}
}
}
// This should never happen. See the assertion below.
return undefined;
});
await this.store.delete(storedJob.id);

View file

@ -95,25 +95,25 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
return { messageId, conversationId };
}
private getQueue(queueKey: string): PQueue {
const existingQueue = this.queues.get(queueKey);
protected getInMemoryQueue({
data,
}: Readonly<{ data: NormalMessageSendJobData }>): PQueue {
const { conversationId } = data;
const existingQueue = this.queues.get(conversationId);
if (existingQueue) {
return existingQueue;
}
const newQueue = new PQueue({ concurrency: 1 });
newQueue.once('idle', () => {
this.queues.delete(queueKey);
this.queues.delete(conversationId);
});
this.queues.set(queueKey, newQueue);
this.queues.set(conversationId, newQueue);
return newQueue;
}
private enqueue(queueKey: string, fn: () => Promise<void>): Promise<void> {
return this.getQueue(queueKey).add(fn);
}
protected async run(
{
data,
@ -121,248 +121,244 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
}: Readonly<{ data: NormalMessageSendJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
const { messageId, conversationId } = data;
const { messageId } = data;
await this.enqueue(conversationId, async () => {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
// We don't immediately use this value because we may want to mark the message
// failed before doing so.
const shouldContinue = await commonShouldJobContinue({
attempt,
log,
timeRemaining,
// We don't immediately use this value because we may want to mark the message
// failed before doing so.
const shouldContinue = await commonShouldJobContinue({
attempt,
log,
timeRemaining,
});
await window.ConversationController.loadPromise();
const message = await getMessageById(messageId);
if (!message) {
log.info(
`message ${messageId} was not found, maybe because it was deleted. Giving up on sending it`
);
return;
}
if (!isOutgoing(message.attributes)) {
log.error(
`message ${messageId} was not an outgoing message to begin with. This is probably a bogus job. Giving up on sending it`
);
return;
}
if (message.isErased() || message.get('deletedForEveryone')) {
log.info(`message ${messageId} was erased. Giving up on sending it`);
return;
}
let messageSendErrors: Array<Error> = [];
// We don't want to save errors on messages unless we're giving up. If it's our
// final attempt, we know upfront that we want to give up. However, we might also
// want to give up if (1) we get a 508 from the server, asking us to please stop
// (2) we get a 428 from the server, flagging the message for spam (3) some other
// reason not known at the time of this writing.
//
// This awkward callback lets us hold onto errors we might want to save, so we can
// decide whether to save them later on.
const saveErrors = isFinalAttempt
? undefined
: (errors: Array<Error>) => {
messageSendErrors = errors;
};
if (!shouldContinue) {
log.info(`message ${messageId} ran out of time. Giving up on sending it`);
await markMessageFailed(message, messageSendErrors);
return;
}
try {
const conversation = message.getConversation();
if (!conversation) {
throw new Error(
`could not find conversation for message with ID ${messageId}`
);
}
const {
allRecipientIdentifiers,
recipientIdentifiersWithoutMe,
untrustedConversationIds,
} = getMessageRecipients({
message,
conversation,
});
await window.ConversationController.loadPromise();
const message = await getMessageById(messageId);
if (!message) {
if (untrustedConversationIds.length) {
log.info(
`message ${messageId} was not found, maybe because it was deleted. Giving up on sending it`
`message ${messageId} sending blocked because ${untrustedConversationIds.length} conversation(s) were untrusted. Giving up on the job, but it may be reborn later`
);
window.reduxActions.conversations.messageStoppedByMissingVerification(
messageId,
untrustedConversationIds
);
return;
}
if (!isOutgoing(message.attributes)) {
log.error(
`message ${messageId} was not an outgoing message to begin with. This is probably a bogus job. Giving up on sending it`
if (!allRecipientIdentifiers.length) {
log.warn(
`trying to send message ${messageId} but it looks like it was already sent to everyone. This is unexpected, but we're giving up`
);
return;
}
if (message.isErased() || message.get('deletedForEveryone')) {
log.info(`message ${messageId} was erased. Giving up on sending it`);
return;
}
const {
attachments,
body,
deletedForEveryoneTimestamp,
expireTimer,
mentions,
messageTimestamp,
preview,
profileKey,
quote,
sticker,
} = await getMessageSendData({ conversation, message });
let messageSendErrors: Array<Error> = [];
let messageSendPromise: Promise<unknown>;
// We don't want to save errors on messages unless we're giving up. If it's our
// final attempt, we know upfront that we want to give up. However, we might also
// want to give up if (1) we get a 508 from the server, asking us to please stop
// (2) we get a 428 from the server, flagging the message for spam (3) some other
// reason not known at the time of this writing.
//
// This awkward callback lets us hold onto errors we might want to save, so we can
// decide whether to save them later on.
const saveErrors = isFinalAttempt
? undefined
: (errors: Array<Error>) => {
messageSendErrors = errors;
};
if (!shouldContinue) {
log.info(
`message ${messageId} ran out of time. Giving up on sending it`
);
await markMessageFailed(message, messageSendErrors);
return;
}
try {
const conversation = message.getConversation();
if (!conversation) {
throw new Error(
`could not find conversation for message with ID ${messageId}`
);
}
const {
allRecipientIdentifiers,
recipientIdentifiersWithoutMe,
untrustedConversationIds,
} = getMessageRecipients({
message,
conversation,
});
if (untrustedConversationIds.length) {
log.info(
`message ${messageId} sending blocked because ${untrustedConversationIds.length} conversation(s) were untrusted. Giving up on the job, but it may be reborn later`
);
window.reduxActions.conversations.messageStoppedByMissingVerification(
messageId,
untrustedConversationIds
);
return;
}
if (!allRecipientIdentifiers.length) {
log.warn(
`trying to send message ${messageId} but it looks like it was already sent to everyone. This is unexpected, but we're giving up`
);
return;
}
const {
if (recipientIdentifiersWithoutMe.length === 0) {
log.info('sending sync message only');
const dataMessage = await window.textsecure.messaging.getDataMessage({
attachments,
body,
deletedForEveryoneTimestamp,
expireTimer,
mentions,
messageTimestamp,
preview,
profileKey,
quote,
recipients: allRecipientIdentifiers,
sticker,
} = await getMessageSendData({ conversation, message });
let messageSendPromise: Promise<unknown>;
if (recipientIdentifiersWithoutMe.length === 0) {
log.info('sending sync message only');
const dataMessage = await window.textsecure.messaging.getDataMessage({
attachments,
body,
deletedForEveryoneTimestamp,
expireTimer,
preview,
profileKey,
quote,
recipients: allRecipientIdentifiers,
sticker,
timestamp: messageTimestamp,
});
messageSendPromise = message.sendSyncMessageOnly(
dataMessage,
saveErrors
);
} else {
const conversationType = conversation.get('type');
const sendOptions = await getSendOptions(conversation.attributes);
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
let innerPromise: Promise<CallbackResultType>;
if (conversationType === Message.GROUP) {
log.info('sending group message');
innerPromise = window.Signal.Util.sendToGroup({
groupSendOptions: {
attachments,
deletedForEveryoneTimestamp,
expireTimer,
groupV1: updateRecipients(
conversation.getGroupV1Info(),
recipientIdentifiersWithoutMe
),
groupV2: updateRecipients(
conversation.getGroupV2Info(),
recipientIdentifiersWithoutMe
),
messageText: body,
preview,
profileKey,
quote,
sticker,
timestamp: messageTimestamp,
mentions,
},
conversation,
contentHint: ContentHint.RESENDABLE,
messageId,
sendOptions,
sendType: 'message',
});
} else {
log.info('sending direct message');
innerPromise = window.textsecure.messaging.sendMessageToIdentifier({
identifier: recipientIdentifiersWithoutMe[0],
messageText: body,
attachments,
quote,
preview,
sticker,
reaction: null,
deletedForEveryoneTimestamp,
timestamp: messageTimestamp,
expireTimer,
contentHint: ContentHint.RESENDABLE,
groupId: undefined,
profileKey,
options: sendOptions,
});
}
messageSendPromise = message.send(
handleMessageSend(innerPromise, {
messageIds: [messageId],
sendType: 'message',
}),
saveErrors
);
}
await messageSendPromise;
if (
getLastChallengeError({
errors: messageSendErrors,
})
) {
log.info(
`message ${messageId} hit a spam challenge. Not retrying any more`
);
await message.saveErrors(messageSendErrors);
return;
}
const didFullySend =
!messageSendErrors.length || didSendToEveryone(message);
if (!didFullySend) {
throw new Error('message did not fully send');
}
} catch (err: unknown) {
const serverAskedUsToStop: boolean = messageSendErrors.some(
(messageSendError: unknown) =>
messageSendError instanceof Error &&
parseIntWithFallback(messageSendError.code, -1) === 508
timestamp: messageTimestamp,
});
messageSendPromise = message.sendSyncMessageOnly(
dataMessage,
saveErrors
);
} else {
const conversationType = conversation.get('type');
const sendOptions = await getSendOptions(conversation.attributes);
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
if (isFinalAttempt || serverAskedUsToStop) {
await markMessageFailed(message, messageSendErrors);
}
if (serverAskedUsToStop) {
log.info('server responded with 508. Giving up on this job');
return;
}
if (!isFinalAttempt) {
const maybe413Error: undefined | Error = messageSendErrors.find(
(messageSendError: unknown) =>
messageSendError instanceof Error && messageSendError.code === 413
);
await sleepFor413RetryAfterTimeIfApplicable({
err: maybe413Error,
log,
timeRemaining,
let innerPromise: Promise<CallbackResultType>;
if (conversationType === Message.GROUP) {
log.info('sending group message');
innerPromise = window.Signal.Util.sendToGroup({
groupSendOptions: {
attachments,
deletedForEveryoneTimestamp,
expireTimer,
groupV1: updateRecipients(
conversation.getGroupV1Info(),
recipientIdentifiersWithoutMe
),
groupV2: updateRecipients(
conversation.getGroupV2Info(),
recipientIdentifiersWithoutMe
),
messageText: body,
preview,
profileKey,
quote,
sticker,
timestamp: messageTimestamp,
mentions,
},
conversation,
contentHint: ContentHint.RESENDABLE,
messageId,
sendOptions,
sendType: 'message',
});
} else {
log.info('sending direct message');
innerPromise = window.textsecure.messaging.sendMessageToIdentifier({
identifier: recipientIdentifiersWithoutMe[0],
messageText: body,
attachments,
quote,
preview,
sticker,
reaction: null,
deletedForEveryoneTimestamp,
timestamp: messageTimestamp,
expireTimer,
contentHint: ContentHint.RESENDABLE,
groupId: undefined,
profileKey,
options: sendOptions,
});
}
throw err;
messageSendPromise = message.send(
handleMessageSend(innerPromise, {
messageIds: [messageId],
sendType: 'message',
}),
saveErrors
);
}
});
await messageSendPromise;
if (
getLastChallengeError({
errors: messageSendErrors,
})
) {
log.info(
`message ${messageId} hit a spam challenge. Not retrying any more`
);
await message.saveErrors(messageSendErrors);
return;
}
const didFullySend =
!messageSendErrors.length || didSendToEveryone(message);
if (!didFullySend) {
throw new Error('message did not fully send');
}
} catch (err: unknown) {
const serverAskedUsToStop: boolean = messageSendErrors.some(
(messageSendError: unknown) =>
messageSendError instanceof Error &&
parseIntWithFallback(messageSendError.code, -1) === 508
);
if (isFinalAttempt || serverAskedUsToStop) {
await markMessageFailed(message, messageSendErrors);
}
if (serverAskedUsToStop) {
log.info('server responded with 508. Giving up on this job');
return;
}
if (!isFinalAttempt) {
const maybe413Error: undefined | Error = messageSendErrors.find(
(messageSendError: unknown) =>
messageSendError instanceof Error && messageSendError.code === 413
);
await sleepFor413RetryAfterTimeIfApplicable({
err: maybe413Error,
log,
timeRemaining,
});
}
throw err;
}
}
}

View file

@ -8,6 +8,7 @@ import EventEmitter, { once } from 'events';
import { z } from 'zod';
import { noop, groupBy } from 'lodash';
import { v4 as uuid } from 'uuid';
import PQueue from 'p-queue';
import { JobError } from '../../jobs/JobError';
import { TestJobQueueStore } from './TestJobQueueStore';
import { missingCaseError } from '../../util/missingCaseError';
@ -67,6 +68,98 @@ describe('JobQueue', () => {
assert.isEmpty(store.storedJobs);
});
it('by default, kicks off multiple jobs in parallel', async () => {
let activeJobCount = 0;
const eventBus = new EventEmitter();
const updateActiveJobCount = (incrementBy: number): void => {
activeJobCount += incrementBy;
eventBus.emit('updated');
};
class Queue extends JobQueue<number> {
parseData(data: unknown): number {
return z.number().parse(data);
}
async run(): Promise<void> {
try {
updateActiveJobCount(1);
await new Promise<void>(resolve => {
eventBus.on('updated', () => {
if (activeJobCount === 4) {
eventBus.emit('got to 4');
resolve();
}
});
});
} finally {
updateActiveJobCount(-1);
}
}
}
const store = new TestJobQueueStore();
const queue = new Queue({
store,
queueType: 'test queue',
maxAttempts: 100,
});
queue.streamJobs();
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
await once(eventBus, 'got to 4');
});
it('can override the in-memory queue', async () => {
let jobsAdded = 0;
const testQueue = new PQueue();
testQueue.on('add', () => {
jobsAdded += 1;
});
class Queue extends JobQueue<number> {
parseData(data: unknown): number {
return z.number().parse(data);
}
protected getInMemoryQueue(parsedJob: ParsedJob<number>): PQueue {
assert(
new Set([1, 2, 3, 4]).has(parsedJob.data),
'Bad data passed to `getInMemoryQueue`'
);
return testQueue;
}
run(): Promise<void> {
return Promise.resolve();
}
}
const store = new TestJobQueueStore();
const queue = new Queue({
store,
queueType: 'test queue',
maxAttempts: 100,
});
queue.streamJobs();
const jobs = await Promise.all([
queue.add(1),
queue.add(2),
queue.add(3),
queue.add(4),
]);
await Promise.all(jobs.map(job => job.completion));
assert.strictEqual(jobsAdded, 4);
});
it('writes jobs to the database correctly', async () => {
const store = new TestJobQueueStore();