Run resumed zone jobs concurrently

This commit is contained in:
Fedor Indutny 2021-08-24 14:07:40 -07:00 committed by GitHub
parent e15227aa7c
commit b7ccd12245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 7 deletions

View File

@ -204,6 +204,11 @@ const EventsMixin = (function EventsMixin(this: unknown) {
type SessionCacheEntry = CacheEntryType<SessionType, SessionRecord>;
type ZoneQueueEntryType = Readonly<{
zone: Zone;
callback(): void;
}>;
export class SignalProtocolStore extends EventsMixin {
// Enums used across the app
@ -236,7 +241,7 @@ export class SignalProtocolStore extends EventsMixin {
private currentZoneDepth = 0;
private readonly zoneQueue: Array<() => void> = [];
private readonly zoneQueue: Array<ZoneQueueEntryType> = [];
private pendingSessions = new Map<string, SessionCacheEntry>();
@ -659,7 +664,7 @@ export class SignalProtocolStore extends EventsMixin {
);
return new Promise<T>((resolve, reject) => {
this.zoneQueue.push(async () => {
const callback = async () => {
const duration = Date.now() - start;
window.log.info(`${debugName}: unlocked after ${duration}ms`);
@ -670,7 +675,9 @@ export class SignalProtocolStore extends EventsMixin {
} catch (error) {
reject(error);
}
});
};
this.zoneQueue.push({ zone, callback });
});
}
@ -750,7 +757,7 @@ export class SignalProtocolStore extends EventsMixin {
this.currentZone = zone;
if (zone !== GLOBAL_ZONE) {
window.log.info(`enterZone(${zone.name}:${name})`);
window.log.info(`SignalProtocolStore.enterZone(${zone.name}:${name})`);
}
}
}
@ -769,13 +776,31 @@ export class SignalProtocolStore extends EventsMixin {
}
if (zone !== GLOBAL_ZONE) {
window.log.info(`leaveZone(${zone.name})`);
window.log.info(`SignalProtocolStore.leaveZone(${zone.name})`);
}
this.currentZone = undefined;
const next = this.zoneQueue.shift();
if (next) {
next();
if (!next) {
return;
}
const toEnter = [next];
while (this.zoneQueue[0]?.zone === next.zone) {
const elem = this.zoneQueue.shift();
assert(elem, 'Zone element should be present');
toEnter.push(elem);
}
window.log.info(
`SignalProtocolStore: running blocked ${toEnter.length} jobs in ` +
`zone ${next.zone.name}`
);
for (const { callback } of toEnter) {
callback();
}
}

View File

@ -1501,6 +1501,40 @@ describe('SignalProtocolStore', () => {
await store.archiveSiblingSessions(id, { zone });
});
it('can be concurrently re-entered after waiting', async () => {
const a = new Zone('a');
const b = new Zone('b');
const order: Array<number> = [];
const promises: Array<Promise<unknown>> = [];
// 1. Enter zone "a"
// 2. Wait for zone "a" to be left to enter zone "b" twice
// 3. Verify that both zone "b" tasks ran in parallel
promises.push(store.withZone(a, 'a', async () => order.push(1)));
promises.push(
store.withZone(b, 'b', async () => {
order.push(2);
await Promise.resolve();
order.push(22);
})
);
promises.push(
store.withZone(b, 'b', async () => {
order.push(3);
await Promise.resolve();
order.push(33);
})
);
await Promise.resolve();
await Promise.resolve();
await Promise.all(promises);
assert.deepEqual(order, [1, 2, 3, 22, 33]);
});
});
describe('Not yet processed messages', () => {