Increment unprocessed attempts when fetching

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2022-04-28 16:51:38 -07:00 committed by GitHub
parent 93b26b972b
commit f235d48e99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 41 deletions

View File

@ -1859,9 +1859,9 @@ export class SignalProtocolStore extends EventsMixin {
});
}
getAllUnprocessed(): Promise<Array<UnprocessedType>> {
getAllUnprocessedAndIncrementAttempts(): Promise<Array<UnprocessedType>> {
return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => {
return window.Signal.Data.getAllUnprocessed();
return window.Signal.Data.getAllUnprocessedAndIncrementAttempts();
});
}

View File

@ -247,7 +247,7 @@ const dataInterface: ClientInterface = {
migrateConversationMessages,
getUnprocessedCount,
getAllUnprocessed,
getAllUnprocessedAndIncrementAttempts,
getUnprocessedById,
updateUnprocessedWithData,
updateUnprocessedsWithData,
@ -1443,8 +1443,8 @@ async function getUnprocessedCount() {
return channels.getUnprocessedCount();
}
async function getAllUnprocessed() {
return channels.getAllUnprocessed();
async function getAllUnprocessedAndIncrementAttempts() {
return channels.getAllUnprocessedAndIncrementAttempts();
}
async function getUnprocessedById(id: string) {

View File

@ -474,7 +474,7 @@ export type DataInterface = {
) => Promise<void>;
getUnprocessedCount: () => Promise<number>;
getAllUnprocessed: () => Promise<Array<UnprocessedType>>;
getAllUnprocessedAndIncrementAttempts: () => Promise<Array<UnprocessedType>>;
updateUnprocessedWithData: (
id: string,
data: UnprocessedUpdateType

View File

@ -244,7 +244,7 @@ const dataInterface: ServerInterface = {
migrateConversationMessages,
getUnprocessedCount,
getAllUnprocessed,
getAllUnprocessedAndIncrementAttempts,
updateUnprocessedWithData,
updateUnprocessedsWithData,
getUnprocessedById,
@ -3181,32 +3181,58 @@ async function getUnprocessedCount(): Promise<number> {
return getCountFromTable(getInstance(), 'unprocessed');
}
async function getAllUnprocessed(): Promise<Array<UnprocessedType>> {
async function getAllUnprocessedAndIncrementAttempts(): Promise<
Array<UnprocessedType>
> {
const db = getInstance();
const { changes: deletedCount } = db
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
.run({
monthAgo: Date.now() - durations.MONTH,
});
return db.transaction(() => {
const { changes: deletedStaleCount } = db
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
.run({
monthAgo: Date.now() - durations.MONTH,
});
if (deletedCount !== 0) {
logger.warn(
`getAllUnprocessed: deleting ${deletedCount} old unprocessed envelopes`
);
}
if (deletedStaleCount !== 0) {
logger.warn(
'getAllUnprocessedAndIncrementAttempts: ' +
`deleting ${deletedStaleCount} old unprocessed envelopes`
);
}
const rows = db
.prepare<EmptyQuery>(
db.prepare<EmptyQuery>(
`
SELECT *
FROM unprocessed
ORDER BY timestamp ASC;
UPDATE unprocessed
SET attempts = attempts + 1
`
)
.all();
).run();
return rows;
const { changes: deletedInvalidCount } = db
.prepare<Query>(
`
DELETE FROM unprocessed
WHERE attempts >= $MAX_UNPROCESSED_ATTEMPTS
`
)
.run({ MAX_UNPROCESSED_ATTEMPTS });
if (deletedInvalidCount !== 0) {
logger.warn(
'getAllUnprocessedAndIncrementAttempts: ' +
`deleting ${deletedInvalidCount} invalid unprocessed envelopes`
);
}
return db
.prepare<EmptyQuery>(
`
SELECT *
FROM unprocessed
ORDER BY timestamp ASC;
`
)
.all();
})();
}
function removeUnprocessedsSync(ids: Array<string>): void {

View File

@ -1499,7 +1499,8 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
const allUnprocessed = await store.getAllUnprocessed();
const allUnprocessed =
await store.getAllUnprocessedAndIncrementAttempts();
assert.deepEqual(
allUnprocessed.map(({ envelope }) => envelope),
['second']
@ -1551,7 +1552,7 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
assert.deepEqual(await store.getAllUnprocessed(), []);
assert.deepEqual(await store.getAllUnprocessedAndIncrementAttempts(), []);
});
it('can be re-entered', async () => {
@ -1647,7 +1648,7 @@ describe('SignalProtocolStore', () => {
beforeEach(async () => {
await store.removeAllUnprocessed();
const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});
@ -1687,7 +1688,7 @@ describe('SignalProtocolStore', () => {
}),
]);
const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 3);
// they are in the proper order because the collection comparator is 'timestamp'
@ -1708,10 +1709,11 @@ describe('SignalProtocolStore', () => {
});
await store.updateUnprocessedWithData(id, { decrypted: 'updated' });
const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 1);
assert.strictEqual(items[0].decrypted, 'updated');
assert.strictEqual(items[0].timestamp, NOW + 1);
assert.strictEqual(items[0].attempts, 1);
});
it('removeUnprocessed successfully deletes item', async () => {
@ -1726,7 +1728,21 @@ describe('SignalProtocolStore', () => {
});
await store.removeUnprocessed(id);
const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});
it('getAllUnprocessedAndIncrementAttempts deletes items', async () => {
await store.addUnprocessed({
id: '1-one',
envelope: 'first',
timestamp: NOW + 1,
receivedAtCounter: 0,
version: 2,
attempts: 3,
});
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});
});

View File

@ -802,17 +802,11 @@ export default class MessageReceiver
return [];
}
const items = await this.storage.protocol.getAllUnprocessed();
const items =
await this.storage.protocol.getAllUnprocessedAndIncrementAttempts();
log.info('getAllFromCache loaded', items.length, 'saved envelopes');
return items.map(item => {
const { attempts = 0 } = item;
return {
...item,
attempts: attempts + 1,
};
});
return items;
}
private async decryptAndCacheBatch(