New queue for view once syncs, handleRetry improvements

This commit is contained in:
Scott Nonnenberg 2022-02-08 09:30:42 -08:00 committed by GitHub
parent 571ee3cab6
commit 0a18cc50bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 271 additions and 115 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2017-2021 Signal Messenger, LLC
// Copyright 2017-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* global Whisper, window */
@ -18,6 +18,7 @@ try {
// It is important to call this as early as possible
const { SignalContext } = require('./ts/windows/context');
window.i18n = SignalContext.i18n;
const { getEnvironment, Environment } = require('./ts/environment');
const ipc = electron.ipcRenderer;
@ -421,7 +422,6 @@ try {
const Attachments = require('./ts/windows/attachments');
const { locale } = config;
window.i18n = SignalContext.i18n;
window.moment.updateLocale(locale, {
relativeTime: {
s: window.i18n('timestamp_s'),

View File

@ -2968,9 +2968,9 @@ export async function startApp(): Promise<void> {
}
log.info(
'onProfileKeyUpdate: updating profileKey',
data.source,
data.sourceUuid
'onProfileKeyUpdate: updating profileKey for',
data.sourceUuid,
data.source
);
await conversation.setProfileKey(data.profileKey);

View File

@ -1,4 +1,4 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { chunk } from 'lodash';
@ -12,6 +12,7 @@ import { isRecord } from '../../util/isRecord';
import { commonShouldJobContinue } from './commonShouldJobContinue';
import { handleCommonJobRequestError } from './handleCommonJobRequestError';
import { missingCaseError } from '../../util/missingCaseError';
const CHUNK_SIZE = 100;
@ -21,6 +22,11 @@ export type SyncType = {
senderUuid?: string;
timestamp: number;
};
export enum SyncTypeList {
Read = 'Read',
View = 'View',
ViewOnceOpen = 'ViewOnceOpen',
}
/**
* Parse what _should_ be an array of `SyncType`s.
@ -55,16 +61,16 @@ function parseOptionalString(name: string, value: unknown): undefined | string {
throw new Error(`${name} was not a string`);
}
export async function runReadOrViewSyncJob({
export async function runSyncJob({
attempt,
isView,
type,
log,
maxRetryTime,
syncs,
timestamp,
}: Readonly<{
attempt: number;
isView: boolean;
type: SyncTypeList;
log: LoggerType;
maxRetryTime: number;
syncs: ReadonlyArray<SyncType>;
@ -76,10 +82,19 @@ export async function runReadOrViewSyncJob({
}
let sendType: SendTypesType;
if (isView) {
sendType = 'viewSync';
} else {
sendType = 'readSync';
switch (type) {
case SyncTypeList.View:
sendType = 'viewSync';
break;
case SyncTypeList.Read:
sendType = 'readSync';
break;
case SyncTypeList.ViewOnceOpen:
sendType = 'viewOnceSync';
break;
default: {
throw missingCaseError(type);
}
}
const syncTimestamps = syncs.map(sync => sync.timestamp);
@ -108,15 +123,27 @@ export async function runReadOrViewSyncJob({
let doSync:
| typeof window.textsecure.messaging.syncReadMessages
| typeof window.textsecure.messaging.syncView;
if (isView) {
doSync = window.textsecure.messaging.syncView.bind(
window.textsecure.messaging
);
} else {
doSync = window.textsecure.messaging.syncReadMessages.bind(
window.textsecure.messaging
);
| typeof window.textsecure.messaging.syncView
| typeof window.textsecure.messaging.syncViewOnceOpen;
switch (type) {
case SyncTypeList.View:
doSync = window.textsecure.messaging.syncView.bind(
window.textsecure.messaging
);
break;
case SyncTypeList.Read:
doSync = window.textsecure.messaging.syncReadMessages.bind(
window.textsecure.messaging
);
break;
case SyncTypeList.ViewOnceOpen:
doSync = window.textsecure.messaging.syncViewOnceOpen.bind(
window.textsecure.messaging
);
break;
default: {
throw missingCaseError(type);
}
}
try {

View File

@ -11,6 +11,7 @@ import { readSyncJobQueue } from './readSyncJobQueue';
import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue';
import { reportSpamJobQueue } from './reportSpamJobQueue';
import { singleProtoJobQueue } from './singleProtoJobQueue';
import { viewOnceOpenJobQueue } from './viewOnceOpenJobQueue';
import { viewSyncJobQueue } from './viewSyncJobQueue';
import { viewedReceiptsJobQueue } from './viewedReceiptsJobQueue';
@ -24,14 +25,24 @@ export function initializeAllJobQueues({
}): void {
reportSpamJobQueue.initialize({ server });
deliveryReceiptsJobQueue.streamJobs();
// General conversation send queue
normalMessageSendJobQueue.streamJobs();
reactionJobQueue.streamJobs();
// Single proto send queue, used for a variety of one-off simple messages
singleProtoJobQueue.streamJobs();
// Syncs to others
deliveryReceiptsJobQueue.streamJobs();
readReceiptsJobQueue.streamJobs();
viewedReceiptsJobQueue.streamJobs();
viewOnceOpenJobQueue.streamJobs();
// Syncs to ourselves
readSyncJobQueue.streamJobs();
viewSyncJobQueue.streamJobs();
// Other queues
removeStorageKeyJobQueue.streamJobs();
reportSpamJobQueue.streamJobs();
singleProtoJobQueue.streamJobs();
viewSyncJobQueue.streamJobs();
viewedReceiptsJobQueue.streamJobs();
}

View File

@ -1,14 +1,15 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as durations from '../util/durations';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import type { SyncType } from './helpers/readAndViewSyncHelpers';
import type { SyncType } from './helpers/syncHelpers';
import {
SyncTypeList,
parseRawSyncDataArray,
runReadOrViewSyncJob,
} from './helpers/readAndViewSyncHelpers';
runSyncJob,
} from './helpers/syncHelpers';
import { strictAssert } from '../util/assert';
import { isRecord } from '../util/isRecord';
@ -31,13 +32,13 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
{ data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
await runReadOrViewSyncJob({
await runSyncJob({
attempt,
isView: false,
log,
maxRetryTime: MAX_RETRY_TIME,
syncs: data.readSyncs,
timestamp,
type: SyncTypeList.Read,
});
}
}

View File

@ -0,0 +1,53 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as durations from '../util/durations';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import type { SyncType } from './helpers/syncHelpers';
import {
SyncTypeList,
parseRawSyncDataArray,
runSyncJob,
} from './helpers/syncHelpers';
import { strictAssert } from '../util/assert';
import { isRecord } from '../util/isRecord';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
const MAX_RETRY_TIME = durations.DAY;
export type ViewOnceOpenJobData = {
viewOnceOpens: Array<SyncType>;
};
export class ViewOnceOpenJobQueue extends JobQueue<ViewOnceOpenJobData> {
protected parseData(data: unknown): ViewOnceOpenJobData {
strictAssert(isRecord(data), 'data is not an object');
return { viewOnceOpens: parseRawSyncDataArray(data.viewOnceOpens) };
}
protected async run(
{
data,
timestamp,
}: Readonly<{ data: ViewOnceOpenJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
await runSyncJob({
attempt,
log,
maxRetryTime: MAX_RETRY_TIME,
syncs: data.viewOnceOpens,
timestamp,
type: SyncTypeList.ViewOnceOpen,
});
}
}
export const viewOnceOpenJobQueue = new ViewOnceOpenJobQueue({
store: jobQueueDatabaseStore,
queueType: 'view once open sync',
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
});

View File

@ -1,14 +1,15 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as durations from '../util/durations';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import type { SyncType } from './helpers/readAndViewSyncHelpers';
import type { SyncType } from './helpers/syncHelpers';
import {
SyncTypeList,
parseRawSyncDataArray,
runReadOrViewSyncJob,
} from './helpers/readAndViewSyncHelpers';
runSyncJob,
} from './helpers/syncHelpers';
import { strictAssert } from '../util/assert';
import { isRecord } from '../util/isRecord';
@ -31,13 +32,13 @@ export class ViewSyncJobQueue extends JobQueue<ViewSyncJobData> {
{ data, timestamp }: Readonly<{ data: ViewSyncJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
await runReadOrViewSyncJob({
await runSyncJob({
attempt,
isView: true,
log,
maxRetryTime: MAX_RETRY_TIME,
syncs: data.viewSyncs,
timestamp,
type: SyncTypeList.View,
});
}
}

View File

@ -143,6 +143,7 @@ import {
isQuoteAMatch,
} from '../messages/helpers';
import type { ReplacementValuesType } from '../types/I18N';
import { viewOnceOpenJobQueue } from '../jobs/viewOnceOpenJobQueue';
/* eslint-disable camelcase */
/* eslint-disable more/no-then */
@ -843,20 +844,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
await this.eraseContents();
if (!fromSync) {
const sender = getSource(this.attributes);
const senderE164 = getSource(this.attributes);
const senderUuid = getSourceUuid(this.attributes);
const timestamp = this.get('sent_at');
if (senderUuid === undefined) {
throw new Error('senderUuid is undefined');
throw new Error('markViewOnceMessageViewed: senderUuid is undefined');
}
const timestamp = this.get('sent_at');
const ourConversation =
window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
if (window.ConversationController.areWePrimaryDevice()) {
log.warn(
'markViewOnceMessageViewed: We are primary device; not sending view once open sync'
@ -864,15 +859,22 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
return;
}
await handleMessageSend(
window.textsecure.messaging.syncViewOnceOpen(
sender,
senderUuid,
timestamp,
sendOptions
),
{ messageIds: [this.id], sendType: 'viewOnceSync' }
);
try {
await viewOnceOpenJobQueue.add({
viewOnceOpens: [
{
senderE164,
senderUuid,
timestamp,
},
],
});
} catch (error) {
log.error(
'markViewOnceMessageViewed: Failed to queue view once open sync',
Errors.toLogFormat(error)
);
}
}
}

View File

@ -1,9 +1,9 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { parseRawSyncDataArray } from '../../../jobs/helpers/readAndViewSyncHelpers';
import { parseRawSyncDataArray } from '../../../jobs/helpers/syncHelpers';
describe('read and view sync helpers', () => {
describe('parseRawSyncDataArray', () => {

View File

@ -1363,18 +1363,31 @@ export default class MessageSender {
}
async syncViewOnceOpen(
sender: string | undefined,
senderUuid: string,
timestamp: number,
viewOnceOpens: ReadonlyArray<{
senderUuid?: string;
senderE164?: string;
timestamp: number;
}>,
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
if (viewOnceOpens.length !== 1) {
throw new Error(
`syncViewOnceOpen: ${viewOnceOpens.length} opens provided. Can only handle one.`
);
}
const { senderE164, senderUuid, timestamp } = viewOnceOpens[0];
if (!senderUuid) {
throw new Error('syncViewOnceOpen: Missing senderUuid');
}
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const syncMessage = this.createSyncMessage();
const viewOnceOpen = new Proto.SyncMessage.ViewOnceOpen();
if (sender !== undefined) {
viewOnceOpen.sender = sender;
if (senderE164 !== undefined) {
viewOnceOpen.sender = senderE164;
}
viewOnceOpen.senderUuid = senderUuid;
viewOnceOpen.timestamp = timestamp;
@ -1862,8 +1875,12 @@ export default class MessageSender {
}
async getSenderKeyDistributionMessage(
distributionId: string
): Promise<SenderKeyDistributionMessage> {
distributionId: string,
{
throwIfNotInDatabase,
timestamp,
}: { throwIfNotInDatabase?: boolean; timestamp: number }
): Promise<Proto.Content> {
const ourUuid = window.textsecure.storage.user.getCheckedUuid();
const ourDeviceId = parseIntOrThrow(
window.textsecure.storage.user.getDeviceId(),
@ -1878,17 +1895,41 @@ export default class MessageSender {
ourUuid,
new Address(ourUuid, ourDeviceId)
);
const senderKeyStore = new SenderKeys({ ourUuid, zone: GLOBAL_ZONE });
return window.textsecure.storage.protocol.enqueueSenderKeyJob(
address,
async () =>
SenderKeyDistributionMessage.create(
protocolAddress,
distributionId,
senderKeyStore
)
const senderKeyDistributionMessage =
await window.textsecure.storage.protocol.enqueueSenderKeyJob(
address,
async () => {
const senderKeyStore = new SenderKeys({ ourUuid, zone: GLOBAL_ZONE });
if (throwIfNotInDatabase) {
const key = await senderKeyStore.getSenderKey(
protocolAddress,
distributionId
);
if (!key) {
throw new Error(
`getSenderKeyDistributionMessage: Distribution ${distributionId} was not in database as expected`
);
}
}
return SenderKeyDistributionMessage.create(
protocolAddress,
distributionId,
senderKeyStore
);
}
);
log.info(
`getSenderKeyDistributionMessage: Building ${distributionId} with timestamp ${timestamp}`
);
const contentMessage = new Proto.Content();
contentMessage.senderKeyDistributionMessage =
senderKeyDistributionMessage.serialize();
return contentMessage;
}
// The one group send exception - a message that should never be sent via sender key
@ -1898,25 +1939,25 @@ export default class MessageSender {
distributionId,
groupId,
identifiers,
throwIfNotInDatabase,
}: Readonly<{
contentHint: number;
distributionId: string;
groupId: string | undefined;
identifiers: ReadonlyArray<string>;
throwIfNotInDatabase?: boolean;
}>,
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
const contentMessage = new Proto.Content();
const timestamp = Date.now();
log.info(
`sendSenderKeyDistributionMessage: Sending ${distributionId} with timestamp ${timestamp}`
const contentMessage = await this.getSenderKeyDistributionMessage(
distributionId,
{
throwIfNotInDatabase,
timestamp,
}
);
const senderKeyDistributionMessage =
await this.getSenderKeyDistributionMessage(distributionId);
contentMessage.senderKeyDistributionMessage =
senderKeyDistributionMessage.serialize();
const sendLogCallback =
identifiers.length > 1
? this.makeSendLogCallback({

View File

@ -7,6 +7,7 @@ import {
} from '@signalapp/signal-client';
import { isNumber } from 'lodash';
import * as Bytes from '../Bytes';
import { isProduction } from './version';
import { strictAssert } from './assert';
import { getSendOptions } from './getSendOptions';
@ -31,10 +32,14 @@ import type {
import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
const RETRY_LIMIT = 5;
// Note: Neither of the the two functions onRetryRequest and onDecrytionError use a job
// queue to make sure sends are reliable. That's unnecessary because these tasks are
// tied to incoming message processing queue, and will only confirm() completion on
// successful send.
// Entrypoints
const retryRecord = new Map<number, number>();
@ -128,6 +133,7 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
messageIds,
requestGroupId,
requesterUuid,
timestamp,
});
const recipientConversation = window.ConversationController.getOrCreate(
@ -252,6 +258,7 @@ async function sendDistributionMessageOrNullMessage(
options: RetryRequestEventData,
didArchive: boolean
): Promise<void> {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
const { groupId, requesterUuid } = options;
let sentDistributionMessage = false;
log.info(`sendDistributionMessageOrNullMessage/${logId}: Starting...`);
@ -260,6 +267,7 @@ async function sendDistributionMessageOrNullMessage(
requesterUuid,
'private'
);
const sendOptions = await getSendOptions(conversation.attributes);
if (groupId) {
const group = window.ConversationController.get(groupId);
@ -277,20 +285,19 @@ async function sendDistributionMessageOrNullMessage(
);
try {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
const result = await handleMessageSend(
window.textsecure.messaging.sendSenderKeyDistributionMessage({
contentHint: ContentHint.RESENDABLE,
distributionId,
groupId,
identifiers: [requesterUuid],
}),
await handleMessageSend(
window.textsecure.messaging.sendSenderKeyDistributionMessage(
{
contentHint: ContentHint.RESENDABLE,
distributionId,
groupId,
identifiers: [requesterUuid],
throwIfNotInDatabase: true,
},
sendOptions
),
{ messageIds: [], sendType: 'senderKeyDistributionMessage' }
);
if (result && result.errors && result.errors.length > 0) {
throw result.errors[0];
}
sentDistributionMessage = true;
} catch (error) {
log.error(
@ -315,14 +322,23 @@ async function sendDistributionMessageOrNullMessage(
// Enqueue a null message using the newly-created session
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getNullMessage({
uuid: requesterUuid,
})
const nullMessage = window.textsecure.messaging.getNullMessage({
uuid: requesterUuid,
});
await handleMessageSend(
window.textsecure.messaging.sendIndividualProto({
...nullMessage,
options: sendOptions,
proto: Proto.Content.decode(
Bytes.fromBase64(nullMessage.protoBase64)
),
timestamp: Date.now(),
}),
{ messageIds: [], sendType: nullMessage.type }
);
} catch (error) {
log.error(
'sendDistributionMessageOrNullMessage: Failed to queue null message',
'sendDistributionMessageOrNullMessage: Failed to send null message',
Errors.toLogFormat(error)
);
}
@ -363,12 +379,14 @@ async function maybeAddSenderKeyDistributionMessage({
messageIds,
requestGroupId,
requesterUuid,
timestamp,
}: {
contentProto: Proto.IContent;
logId: string;
messageIds: Array<string>;
requestGroupId?: string;
requesterUuid: string;
timestamp: number;
}): Promise<{
contentProto: Proto.IContent;
groupId?: string;
@ -402,15 +420,17 @@ async function maybeAddSenderKeyDistributionMessage({
const senderKeyInfo = conversation.get('senderKeyInfo');
if (senderKeyInfo && senderKeyInfo.distributionId) {
const senderKeyDistributionMessage =
const protoWithDistributionMessage =
await window.textsecure.messaging.getSenderKeyDistributionMessage(
senderKeyInfo.distributionId
senderKeyInfo.distributionId,
{ throwIfNotInDatabase: true, timestamp }
);
return {
contentProto: {
...contentProto,
senderKeyDistributionMessage: senderKeyDistributionMessage.serialize(),
senderKeyDistributionMessage:
protoWithDistributionMessage.senderKeyDistributionMessage,
},
groupId: conversation.get('groupId'),
};

View File

@ -7444,6 +7444,13 @@
"reasonCategory": "falseMatch",
"updated": "2020-07-21T18:34:59.251Z"
},
{
"rule": "React-useRef",
"path": "ts/components/LeftPaneMainSearchInput.tsx",
"line": " const inputRef = useRef<HTMLInputElement | null>(null);",
"reasonCategory": "usageTrusted",
"updated": "2022-01-26T23:11:05.369Z"
},
{
"rule": "React-useRef",
"path": "ts/components/LeftPaneSearchInput.tsx",
@ -7487,13 +7494,6 @@
"reasonCategory": "usageTrusted",
"updated": "2021-10-11T21:21:08.188Z"
},
{
"rule": "React-useRef",
"path": "ts/components/LeftPaneMainSearchInput.tsx",
"line": " const inputRef = useRef<HTMLInputElement | null>(null);",
"reasonCategory": "usageTrusted",
"updated": "2022-01-26T23:11:05.369Z"
},
{
"rule": "React-useRef",
"path": "ts/components/Modal.tsx",
@ -7879,17 +7879,17 @@
},
{
"rule": "jQuery-load(",
"path": "ts/jobs/helpers/readAndViewSyncHelpers.js",
"path": "ts/jobs/helpers/syncHelpers.js",
"line": " await window.ConversationController.load();",
"reasonCategory": "falseMatch",
"updated": "2021-12-15T19:58:28.089Z"
},
{
"rule": "jQuery-load(",
"path": "ts/jobs/helpers/readAndViewSyncHelpers.ts",
"path": "ts/jobs/helpers/syncHelpers.ts",
"line": " await window.ConversationController.load();",
"reasonCategory": "falseMatch",
"updated": "2021-12-15T19:58:28.089Z"
"updated": "2021-11-04T16:14:03.477Z"
},
{
"rule": "jQuery-load(",
@ -7903,7 +7903,7 @@
"path": "ts/jobs/normalMessageSendJobQueue.ts",
"line": " await window.ConversationController.load();",
"reasonCategory": "falseMatch",
"updated": "2021-11-04T16:14:03.477Z"
"updated": "2021-12-15T19:58:28.089Z"
},
{
"rule": "jQuery-load(",
@ -8402,4 +8402,4 @@
"reasonCategory": "usageTrusted",
"updated": "2021-09-17T21:02:59.414Z"
}
]
]