JobQueue: If job data does not parse, delete it from database
Co-authored-by: Scott Nonnenberg <scott@signal.org>
This commit is contained in:
parent
8614638eb9
commit
307b6976ad
|
@ -93,7 +93,7 @@ export abstract class JobQueue<T> {
|
||||||
* takes a single number, `parseData` should throw if `data` is a number and should
|
* takes a single number, `parseData` should throw if `data` is a number and should
|
||||||
* return the number otherwise.
|
* return the number otherwise.
|
||||||
*
|
*
|
||||||
* If it throws, the job will be deleted from the store and the job will not be run.
|
* If it throws, the job will be deleted from the database and the job will not be run.
|
||||||
*
|
*
|
||||||
* Will only be called once per job, even if `maxAttempts > 1`.
|
* Will only be called once per job, even if `maxAttempts > 1`.
|
||||||
*/
|
*/
|
||||||
|
@ -205,9 +205,10 @@ export abstract class JobQueue<T> {
|
||||||
parsedData = this.parseData(storedJob.data);
|
parsedData = this.parseData(storedJob.data);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error(
|
log.error(
|
||||||
`${this.logPrefix} failed to parse data for job ${storedJob.id}`,
|
`${this.logPrefix} failed to parse data for job ${storedJob.id}, created ${storedJob.timestamp}. Deleting job. Parse error:`,
|
||||||
Errors.toLogFormat(err)
|
Errors.toLogFormat(err)
|
||||||
);
|
);
|
||||||
|
await this.store.delete(storedJob.id);
|
||||||
reject(
|
reject(
|
||||||
new Error(
|
new Error(
|
||||||
'Failed to parse job data. Was unexpected data loaded from the database?'
|
'Failed to parse job data. Was unexpected data loaded from the database?'
|
||||||
|
|
|
@ -513,7 +513,7 @@ describe('JobQueue', () => {
|
||||||
sinon.assert.calledWithMatch(run, { data: 'valid' });
|
sinon.assert.calledWithMatch(run, { data: 'valid' });
|
||||||
});
|
});
|
||||||
|
|
||||||
it('keeps jobs in the storage if parseData throws', async () => {
|
it('deletes jobs from storage if parseData throws', async () => {
|
||||||
const store = new TestJobQueueStore();
|
const store = new TestJobQueueStore();
|
||||||
|
|
||||||
class TestQueue extends JobQueue<string> {
|
class TestQueue extends JobQueue<string> {
|
||||||
|
@ -539,9 +539,10 @@ describe('JobQueue', () => {
|
||||||
|
|
||||||
await (await queue.add('invalid 1')).completion.catch(noop);
|
await (await queue.add('invalid 1')).completion.catch(noop);
|
||||||
await (await queue.add('invalid 2')).completion.catch(noop);
|
await (await queue.add('invalid 2')).completion.catch(noop);
|
||||||
|
await queue.add('valid');
|
||||||
|
|
||||||
const datas = store.storedJobs.map(job => job.data);
|
const datas = store.storedJobs.map(job => job.data);
|
||||||
assert.sameMembers(datas, ['invalid 1', 'invalid 2']);
|
assert.sameMembers(datas, ['valid']);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('adding the job resolves AFTER inserting the job into the database', async () => {
|
it('adding the job resolves AFTER inserting the job into the database', async () => {
|
||||||
|
|
Loading…
Reference in New Issue