Refactor contact sync processing

This commit is contained in:
Fedor Indutny 2022-08-24 22:04:42 -07:00 committed by GitHub
parent 76e73f63dc
commit 7ce4beb270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 229 additions and 156 deletions

View File

@ -1,7 +1,6 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue';
import Backbone from 'backbone';
import { ipcRenderer as ipc } from 'electron';
@ -17,7 +16,6 @@ window.ROOT_PATH = window.location.href.startsWith('file') ? '../../' : '/';
window.getEnvironment = getEnvironment;
window.getVersion = () => window.SignalContext.config.version;
window.PQueue = PQueue;
window.Backbone = Backbone;
window.localeMessages = ipc.sendSync('locale-data');

View File

@ -6,6 +6,7 @@ import { isNumber } from 'lodash';
import { bindActionCreators } from 'redux';
import { render } from 'react-dom';
import { batch as batchDispatch } from 'react-redux';
import PQueue from 'p-queue';
import MessageReceiver from './textsecure/MessageReceiver';
import type {
@ -21,7 +22,6 @@ import type {
MessageAttributesType,
ConversationAttributesType,
ReactionAttributesType,
ValidateConversationType,
} from './model-types.d';
import * as Bytes from './Bytes';
import * as Timers from './Timers';
@ -64,6 +64,7 @@ import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue';
import { ourProfileKeyService } from './services/ourProfileKey';
import { notificationService } from './services/notifications';
import { areWeASubscriberService } from './services/areWeASubscriber';
import { onContactSync, setIsInitialSync } from './services/contactSync';
import { startTimeTravelDetector } from './util/startTimeTravelDetector';
import { shouldRespondWithProfileKey } from './util/shouldRespondWithProfileKey';
import { LatestQueue } from './util/LatestQueue';
@ -71,7 +72,6 @@ import { parseIntOrThrow } from './util/parseIntOrThrow';
import { getProfile } from './util/getProfile';
import type {
ConfigurationEvent,
ContactEvent,
DecryptionErrorEvent,
DeliveryEvent,
EnvelopeEvent,
@ -153,7 +153,6 @@ import { SeenStatus } from './MessageSeenStatus';
import MessageSender from './textsecure/SendMessage';
import type AccountManager from './textsecure/AccountManager';
import { onStoryRecipientUpdate } from './util/onStoryRecipientUpdate';
import { validateConversation } from './util/validateConversation';
const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000;
@ -314,13 +313,9 @@ export async function startApp(): Promise<void> {
'delivery',
queuedEventListener(onDeliveryReceipt)
);
messageReceiver.addEventListener(
'contact',
queuedEventListener(onContactReceived, false)
);
messageReceiver.addEventListener(
'contactSync',
queuedEventListener(onContactSyncComplete)
queuedEventListener(onContactSync)
);
messageReceiver.addEventListener(
'group',
@ -430,26 +425,26 @@ export async function startApp(): Promise<void> {
areWeASubscriberService.update(window.storage, server);
});
const eventHandlerQueue = new window.PQueue({
const eventHandlerQueue = new PQueue({
concurrency: 1,
timeout: durations.MINUTE * 30,
});
// Note: this queue is meant to allow for stop/start of tasks, not limit parallelism.
const profileKeyResponseQueue = new window.PQueue();
const profileKeyResponseQueue = new PQueue();
profileKeyResponseQueue.pause();
const lightSessionResetQueue = new window.PQueue({ concurrency: 1 });
const lightSessionResetQueue = new PQueue({ concurrency: 1 });
window.Signal.Services.lightSessionResetQueue = lightSessionResetQueue;
lightSessionResetQueue.pause();
const onDecryptionErrorQueue = new window.PQueue({ concurrency: 1 });
const onDecryptionErrorQueue = new PQueue({ concurrency: 1 });
onDecryptionErrorQueue.pause();
const onRetryRequestQueue = new window.PQueue({ concurrency: 1 });
const onRetryRequestQueue = new PQueue({ concurrency: 1 });
onRetryRequestQueue.pause();
window.Whisper.deliveryReceiptQueue = new window.PQueue({
window.Whisper.deliveryReceiptQueue = new PQueue({
concurrency: 1,
timeout: durations.MINUTE * 30,
});
@ -2022,10 +2017,6 @@ export async function startApp(): Promise<void> {
}
}
// When true - we are running the very first storage and contact sync after
// linking.
let isInitialSync = false;
let connectCount = 0;
let connecting = false;
async function connect(firstRun?: boolean) {
@ -2040,7 +2031,7 @@ export async function startApp(): Promise<void> {
connecting = true;
// Reset the flag and update it below if needed
isInitialSync = false;
setIsInitialSync(false);
log.info('connect', { firstRun, connectCount });
@ -2286,7 +2277,7 @@ export async function startApp(): Promise<void> {
const contactSyncComplete = waitForEvent('contactSync:complete');
log.info('firstRun: requesting initial sync');
isInitialSync = true;
setIsInitialSync(true);
// Request configuration, block, GV1 sync messages, contacts
// (only avatars and inboxPosition),and Storage Service sync.
@ -2321,7 +2312,7 @@ export async function startApp(): Promise<void> {
}
log.info('firstRun: initial sync complete');
isInitialSync = false;
setIsInitialSync(false);
// Switch to inbox view even if contact sync is still running
if (
@ -2717,92 +2708,6 @@ export async function startApp(): Promise<void> {
});
}
async function onContactSyncComplete() {
log.info('onContactSyncComplete');
await window.storage.put('synced_at', Date.now());
window.Whisper.events.trigger('contactSync:complete');
}
// Note: Like the handling for incoming/outgoing messages, this method is synchronous,
// deferring its async logic to the function passed to conversation.queueJob().
function onContactReceived(ev: ContactEvent) {
const details = ev.contactDetails;
const partialConversation: ValidateConversationType = {
e164: details.number,
uuid: UUID.cast(details.uuid),
type: 'private',
};
const validationError = validateConversation(partialConversation);
if (validationError) {
log.error(
'Invalid contact received:',
Errors.toLogFormat(validationError)
);
return;
}
const conversation = window.ConversationController.maybeMergeContacts({
e164: details.number,
aci: details.uuid,
reason: 'onContactReceived',
});
strictAssert(conversation, 'need conversation to queue the job!');
// It's important to use queueJob here because we might update the expiration timer
// and we don't want conflicts with incoming message processing happening on the
// conversation queue.
conversation.queueJob('onContactReceived', async () => {
try {
conversation.set({
name: details.name,
inbox_position: details.inboxPosition,
});
// Update the conversation avatar only if new avatar exists and hash differs
const { avatar } = details;
if (avatar && avatar.data) {
const newAttributes = await Conversation.maybeUpdateAvatar(
conversation.attributes,
avatar.data,
{
writeNewAttachmentData,
deleteAttachmentData,
doesAttachmentExist,
}
);
conversation.set(newAttributes);
} else {
const { attributes } = conversation;
if (attributes.avatar && attributes.avatar.path) {
await deleteAttachmentData(attributes.avatar.path);
}
conversation.set({ avatar: null });
}
window.Signal.Data.updateConversation(conversation.attributes);
// expireTimer isn't in Storage Service so we have to rely on contact sync.
const { expireTimer } = details;
const isValidExpireTimer = typeof expireTimer === 'number';
if (isValidExpireTimer) {
await conversation.updateExpirationTimer(expireTimer, {
source: window.ConversationController.getOurConversationId(),
receivedAt: ev.receivedAtCounter,
fromSync: true,
isInitialSync,
reason: 'contact sync',
});
}
window.Whisper.events.trigger('incrementProgress');
} catch (error) {
log.error('onContactReceived error:', Errors.toLogFormat(error));
}
});
}
async function onGroupSyncComplete() {
log.info('onGroupSyncComplete');
await window.storage.put('synced_at', Date.now());

View File

@ -4,6 +4,7 @@
import { compact, has, isNumber, throttle, debounce } from 'lodash';
import { batch as batchDispatch } from 'react-redux';
import { v4 as generateGuid } from 'uuid';
import PQueue from 'p-queue';
import type {
ConversationAttributesType,
@ -191,9 +192,9 @@ export class ConversationModel extends window.Backbone
inProgressFetch?: Promise<unknown>;
newMessageQueue?: typeof window.PQueueType;
newMessageQueue?: PQueue;
jobQueue?: typeof window.PQueueType;
jobQueue?: PQueue;
storeName?: string | null;
@ -1313,7 +1314,7 @@ export class ConversationModel extends window.Backbone
private async beforeAddSingleMessage(): Promise<void> {
if (!this.newMessageQueue) {
this.newMessageQueue = new window.PQueue({
this.newMessageQueue = new PQueue({
concurrency: 1,
timeout: durations.MINUTE * 30,
});
@ -3427,7 +3428,7 @@ export class ConversationModel extends window.Backbone
name: string,
callback: (abortSignal: AbortSignal) => Promise<T>
): Promise<T> {
this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 });
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });
const taskWithTimeout = createTaskWithTimeout(
callback,

179
ts/services/contactSync.ts Normal file
View File

@ -0,0 +1,179 @@
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue';
import type { ContactSyncEvent } from '../textsecure/messageReceiverEvents';
import type { ModifiedContactDetails } from '../textsecure/ContactsParser';
import { UUID } from '../types/UUID';
import * as Conversation from '../types/Conversation';
import * as Errors from '../types/errors';
import type { ValidateConversationType } from '../model-types.d';
import type { ConversationModel } from '../models/conversations';
import { validateConversation } from '../util/validateConversation';
import { strictAssert } from '../util/assert';
import { isDirectConversation, isMe } from '../util/whatTypeOfConversation';
import * as log from '../logging/log';
// When true - we are running the very first storage and contact sync after
// linking.
let isInitialSync = false;
export function setIsInitialSync(newValue: boolean): void {
log.info(`setIsInitialSync(${newValue})`);
isInitialSync = newValue;
}
async function updateConversationFromContactSync(
conversation: ConversationModel,
details: ModifiedContactDetails,
receivedAtCounter: number
): Promise<void> {
const { writeNewAttachmentData, deleteAttachmentData, doesAttachmentExist } =
window.Signal.Migrations;
conversation.set({
name: details.name,
inbox_position: details.inboxPosition,
});
// Update the conversation avatar only if new avatar exists and hash differs
const { avatar } = details;
if (avatar && avatar.data) {
const newAttributes = await Conversation.maybeUpdateAvatar(
conversation.attributes,
avatar.data,
{
writeNewAttachmentData,
deleteAttachmentData,
doesAttachmentExist,
}
);
conversation.set(newAttributes);
} else {
const { attributes } = conversation;
if (attributes.avatar && attributes.avatar.path) {
await deleteAttachmentData(attributes.avatar.path);
}
conversation.set({ avatar: null });
}
// expireTimer isn't in Storage Service so we have to rely on contact sync.
const { expireTimer } = details;
const isValidExpireTimer = typeof expireTimer === 'number';
if (isValidExpireTimer) {
await conversation.updateExpirationTimer(expireTimer, {
source: window.ConversationController.getOurConversationId(),
receivedAt: receivedAtCounter,
fromSync: true,
isInitialSync,
reason: 'contact sync',
});
}
window.Whisper.events.trigger('incrementProgress');
}
const queue = new PQueue({ concurrency: 1 });
async function doContactSync({
contacts,
receivedAtCounter,
}: ContactSyncEvent): Promise<void> {
log.info(
`doContactSync(${receivedAtCounter}): got ${contacts.length} contacts`
);
const updatedConversations = new Set<ConversationModel>();
let promises = new Array<Promise<void>>();
for (const details of contacts) {
const partialConversation: ValidateConversationType = {
e164: details.number,
uuid: UUID.cast(details.uuid),
type: 'private',
};
const validationError = validateConversation(partialConversation);
if (validationError) {
log.error(
`doContactSync(${receivedAtCounter}): Invalid contact received`,
Errors.toLogFormat(validationError)
);
continue;
}
const conversation = window.ConversationController.maybeMergeContacts({
e164: details.number,
aci: details.uuid,
reason: `doContactSync(${receivedAtCounter})`,
});
strictAssert(conversation, 'need conversation to queue the job!');
// It's important to use queueJob here because we might update the expiration timer
// and we don't want conflicts with incoming message processing happening on the
// conversation queue.
const job = conversation.queueJob(
`doContactSync(${receivedAtCounter}).set`,
async () => {
try {
await updateConversationFromContactSync(
conversation,
details,
receivedAtCounter
);
updatedConversations.add(conversation);
} catch (error) {
log.error(
'updateConversationFromContactSync error:',
Errors.toLogFormat(error)
);
}
}
);
promises.push(job);
}
// updatedConversations are not populated until the promises are resolved
await Promise.all(promises);
promises = [];
const notUpdated = window.ConversationController.getAll().filter(
convo =>
!updatedConversations.has(convo) &&
isDirectConversation(convo.attributes) &&
!isMe(convo.attributes)
);
log.info(
`doContactSync(${receivedAtCounter}): ` +
`updated ${updatedConversations.size} ` +
`resetting ${notUpdated.length}`
);
for (const conversation of notUpdated) {
conversation.set({
name: undefined,
inbox_position: undefined,
});
}
// Save new conversation attributes
promises.push(
window.Signal.Data.updateConversations(
[...updatedConversations, ...notUpdated].map(convo => convo.attributes)
)
);
await Promise.all(promises);
await window.storage.put('synced_at', Date.now());
window.Whisper.events.trigger('contactSync:complete');
}
export async function onContactSync(ev: ContactSyncEvent): Promise<void> {
log.info(`onContactSync(${ev.receivedAtCounter}): queueing sync`);
await queue.add(() => doContactSync(ev));
}

View File

@ -6,6 +6,7 @@
import { ipcRenderer as ipc } from 'electron';
import fs from 'fs-extra';
import pify from 'pify';
import PQueue from 'p-queue';
import {
compact,
@ -1490,7 +1491,7 @@ async function removeAllMessagesInConversation(
log.info(`removeAllMessagesInConversation/${logId}: Cleanup...`);
// Note: It's very important that these models are fully hydrated because
// we need to delete all associated on-disk files along with the database delete.
const queue = new window.PQueue({ concurrency: 3, timeout: MINUTE * 30 });
const queue = new PQueue({ concurrency: 3, timeout: MINUTE * 30 });
queue.addAll(
messages.map(
(message: MessageType) => async () => cleanupMessage(message)

View File

@ -30,10 +30,12 @@ export type ModifiedGroupDetails = MessageWithAvatar<Proto.GroupDetails>;
export type ModifiedContactDetails = MessageWithAvatar<Proto.ContactDetails>;
class ParserBase<
abstract class ParserBase<
Message extends OptionalAvatar,
Decoder extends DecoderBase<Message>
> {
Decoder extends DecoderBase<Message>,
Result
> implements Iterable<Result>
{
protected readonly reader: protobuf.Reader;
constructor(bytes: Uint8Array, private readonly decoder: Decoder) {
@ -83,17 +85,28 @@ class ParserBase<
return undefined;
}
}
public abstract next(): Result | undefined;
*[Symbol.iterator](): Iterator<Result> {
let result = this.next();
while (result !== undefined) {
yield result;
result = this.next();
}
}
}
export class GroupBuffer extends ParserBase<
Proto.GroupDetails,
typeof Proto.GroupDetails
typeof Proto.GroupDetails,
ModifiedGroupDetails
> {
constructor(arrayBuffer: Uint8Array) {
super(arrayBuffer, Proto.GroupDetails);
}
public next(): ModifiedGroupDetails | undefined {
public override next(): ModifiedGroupDetails | undefined {
const proto = this.decodeDelimited();
if (!proto) {
return undefined;
@ -120,13 +133,14 @@ export class GroupBuffer extends ParserBase<
export class ContactBuffer extends ParserBase<
Proto.ContactDetails,
typeof Proto.ContactDetails
typeof Proto.ContactDetails,
ModifiedContactDetails
> {
constructor(arrayBuffer: Uint8Array) {
super(arrayBuffer, Proto.ContactDetails);
}
public next(): ModifiedContactDetails | undefined {
public override next(): ModifiedContactDetails | undefined {
const proto = this.decodeDelimited();
if (!proto) {
return undefined;

View File

@ -104,7 +104,6 @@ import {
StickerPackEvent,
ReadSyncEvent,
ViewSyncEvent,
ContactEvent,
ContactSyncEvent,
GroupEvent,
GroupSyncEvent,
@ -561,11 +560,6 @@ export default class MessageReceiver
handler: (ev: ViewSyncEvent) => void
): void;
public override addEventListener(
name: 'contact',
handler: (ev: ContactEvent) => void
): void;
public override addEventListener(
name: 'contactSync',
handler: (ev: ContactSyncEvent) => void
@ -2748,6 +2742,9 @@ export default class MessageReceiver
return this.handleSentMessage(envelope, sentMessage);
}
if (syncMessage.contacts) {
// Note: we do not return here because we don't want to block the next
// message on this attachment download and a lot of processing of that
// attachment.
this.handleContacts(envelope, syncMessage.contacts);
return;
}
@ -3068,26 +3065,14 @@ export default class MessageReceiver
this.removeFromCache(envelope);
// Note: we do not return here because we don't want to block the next message on
// this attachment download and a lot of processing of that attachment.
const attachmentPointer = await this.handleAttachment(blob);
const results = [];
const contactBuffer = new ContactBuffer(attachmentPointer.data);
let contactDetails = contactBuffer.next();
while (contactDetails !== undefined) {
const contactEvent = new ContactEvent(
contactDetails,
envelope.receivedAtCounter
);
results.push(this.dispatchAndWait(contactEvent));
contactDetails = contactBuffer.next();
}
await Promise.all(results);
const finalEvent = new ContactSyncEvent();
await this.dispatchAndWait(finalEvent);
const contactSync = new ContactSyncEvent(
Array.from(contactBuffer),
envelope.receivedAtCounter
);
await this.dispatchAndWait(contactSync);
log.info('handleContacts: finished');
}

View File

@ -73,17 +73,11 @@ export class ErrorEvent extends Event {
}
}
export class ContactEvent extends Event {
export class ContactSyncEvent extends Event {
constructor(
public readonly contactDetails: ModifiedContactDetails,
public readonly contacts: ReadonlyArray<ModifiedContactDetails>,
public readonly receivedAtCounter: number
) {
super('contact');
}
}
export class ContactSyncEvent extends Event {
constructor() {
super('contactSync');
}
}

2
ts/window.d.ts vendored
View File

@ -243,8 +243,6 @@ declare global {
getAutoLaunch: () => Promise<boolean>;
setAutoLaunch: (value: boolean) => Promise<void>;
PQueue: typeof PQueue;
PQueueType: PQueue;
Mustache: {
render: (template: string, data: any, partials?: any) => string;
parse: (template: string) => void;

View File

@ -7,7 +7,6 @@ import * as React from 'react';
import * as ReactDOM from 'react-dom';
import * as moment from 'moment';
import 'moment/min/locales.min';
import PQueue from 'p-queue';
import { textsecure } from '../../textsecure';
import { imageToBlurHash } from '../../util/imageToBlurHash';
@ -44,7 +43,6 @@ window.libphonenumberFormat = PhoneNumberFormat;
window.React = React;
window.ReactDOM = ReactDOM;
window.PQueue = PQueue;
const { locale } = config;
moment.updateLocale(locale, {