Move a number of sync messages to jobs for retry

This commit is contained in:
Scott Nonnenberg 2022-01-14 13:34:52 -08:00 committed by GitHub
parent 74aaf7819a
commit 90356d4c0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 501 additions and 373 deletions

View File

@ -6630,7 +6630,7 @@
"description": "A title of the dialog displayed when starting an application after a recent crash"
},
"CrashReportDialog__body": {
"message": "Signal restarted after a crash. You can submit a crash a report to help Signal investigate the issue.",
"message": "Signal restarted after a crash. You can submit a crash report to help Signal investigate the issue.",
"description": "The body of the dialog displayed when starting an application after a recent crash"
},
"CrashReportDialog__submit": {

View File

@ -1,4 +1,4 @@
// Copyright 2016-2021 Signal Messenger, LLC
// Copyright 2016-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue';
@ -18,7 +18,6 @@ import {
import * as Bytes from './Bytes';
import { constantTimeEqual } from './Crypto';
import { assert, strictAssert } from './util/assert';
import { handleMessageSend } from './util/handleMessageSend';
import { isNotNil } from './util/isNotNil';
import { Zone } from './util/Zone';
import { isMoreRecentThan } from './util/timestamp';
@ -44,7 +43,6 @@ import type {
UnprocessedType,
UnprocessedUpdateType,
} from './textsecure/Types.d';
import { getSendOptions } from './util/getSendOptions';
import type { RemoveAllConfiguration } from './types/RemoveAllConfiguration';
import type { UUIDStringType } from './types/UUID';
import { UUID } from './types/UUID';
@ -52,6 +50,8 @@ import type { Address } from './types/Address';
import type { QualifiedAddressStringType } from './types/QualifiedAddress';
import { QualifiedAddress } from './types/QualifiedAddress';
import * as log from './logging/log';
import { singleProtoJobQueue } from './jobs/singleProtoJobQueue';
import * as Errors from './types/errors';
const TIMESTAMP_THRESHOLD = 5 * 1000; // 5 seconds
@ -1384,29 +1384,22 @@ export class SignalProtocolStore extends EventsMixin {
// Archive open session with this device
await this.archiveSession(qualifiedAddress);
// Send a null message with newly-created session
const sendOptions = await getSendOptions(conversation.attributes);
const result = await handleMessageSend(
window.textsecure.messaging.sendNullMessage(
{
uuid: uuid.toString(),
},
sendOptions
),
{ messageIds: [], sendType: 'nullMessage' }
// Enqueue a null message with newly-created session
await singleProtoJobQueue.add(
window.textsecure.messaging.getNullMessage({
uuid: uuid.toString(),
})
);
if (result && result.errors && result.errors.length) {
throw result.errors[0];
}
} catch (error) {
// If we failed to do the session reset, then we'll allow another attempt sooner
// than one hour from now.
delete sessionResets[id];
window.storage.put('sessionResets', sessionResets);
const errorString = error && error.stack ? error.stack : error;
log.error(`lightSessionReset/${id}: Encountered error`, errorString);
log.error(
`lightSessionReset/${id}: Encountered error`,
Errors.toLogFormat(error)
);
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2020-2021 Signal Messenger, LLC
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { webFrame } from 'electron';
@ -89,9 +89,7 @@ import type { WebAPIType } from './textsecure/WebAPI';
import * as KeyChangeListener from './textsecure/KeyChangeListener';
import { RotateSignedPreKeyListener } from './textsecure/RotateSignedPreKeyListener';
import { isDirectConversation, isGroupV2 } from './util/whatTypeOfConversation';
import { getSendOptions } from './util/getSendOptions';
import { BackOff, FIBONACCI_TIMEOUTS } from './util/BackOff';
import { handleMessageSend } from './util/handleMessageSend';
import { AppViewType } from './state/ducks/app';
import { UsernameSaveState } from './state/ducks/conversationsEnums';
import type { BadgesStateType } from './state/ducks/badges';
@ -144,6 +142,7 @@ import { startInteractionMode } from './windows/startInteractionMode';
import { deliveryReceiptsJobQueue } from './jobs/deliveryReceiptsJobQueue';
import { updateOurUsername } from './util/updateOurUsername';
import { ReactionSource } from './reactions/ReactionSource';
import { singleProtoJobQueue } from './jobs/singleProtoJobQueue';
const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000;
@ -1607,7 +1606,7 @@ export async function startApp(): Promise<void> {
unlinkAndDisconnect(RemoveAllConfiguration.Full);
});
function runStorageService() {
async function runStorageService() {
window.Signal.Services.enableStorageService();
if (window.ConversationController.areWePrimaryDevice()) {
@ -1617,10 +1616,16 @@ export async function startApp(): Promise<void> {
return;
}
handleMessageSend(window.textsecure.messaging.sendRequestKeySyncMessage(), {
messageIds: [],
sendType: 'otherSync',
});
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getRequestKeySyncMessage()
);
} catch (error) {
log.error(
'runStorageService: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}
let challengeHandler: ChallengeHandler | undefined;
@ -1865,10 +1870,16 @@ export async function startApp(): Promise<void> {
return;
}
await handleMessageSend(
window.textsecure.messaging.sendRequestKeySyncMessage(),
{ messageIds: [], sendType: 'otherSync' }
);
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getRequestKeySyncMessage()
);
} catch (error) {
log.error(
'desktop.storage/onChange: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}
);
@ -2196,12 +2207,6 @@ export async function startApp(): Promise<void> {
runStorageService();
});
const ourConversation =
window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
const installedStickerPacks = Stickers.getInstalledStickerPacks();
if (installedStickerPacks.length) {
const operations = installedStickerPacks.map(pack => ({
@ -2217,18 +2222,16 @@ export async function startApp(): Promise<void> {
return;
}
handleMessageSend(
window.textsecure.messaging.sendStickerPackSync(
operations,
sendOptions
),
{ messageIds: [], sendType: 'otherSync' }
).catch(error => {
log.error(
'Failed to send installed sticker packs via sync message',
error && error.stack ? error.stack : error
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getStickerPackSync(operations)
);
});
} catch (error) {
log.error(
'connect: Failed to queue sticker sync message',
Errors.toLogFormat(error)
);
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isRecord } from '../../util/isRecord';
@ -13,11 +13,13 @@ export function getHttpErrorCode(maybeError: unknown): number {
return -1;
}
// This might be a textsecure/Errors/HTTPError
const maybeTopLevelCode = parseIntWithFallback(maybeError.code, -1);
if (maybeTopLevelCode !== -1) {
return maybeTopLevelCode;
}
// Various errors in textsecure/Errors have a nested httpError property
const { httpError } = maybeError;
if (!isRecord(httpError)) {
return -1;

View File

@ -18,7 +18,7 @@ export async function handleMultipleSendErrors({
errors: ReadonlyArray<unknown>;
isFinalAttempt: boolean;
log: Pick<LoggerType, 'info'>;
markFailed: (() => void) | (() => Promise<void>);
markFailed?: (() => void) | (() => Promise<void>);
timeRemaining: number;
}>): Promise<void> {
strictAssert(errors.length, 'Expected at least one error');
@ -50,7 +50,7 @@ export async function handleMultipleSendErrors({
);
if (isFinalAttempt || serverAskedUsToStop) {
await markFailed();
await markFailed?.();
}
if (serverAskedUsToStop) {

View File

@ -1,4 +1,4 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { WebAPIType } from '../textsecure/WebAPI';
@ -10,6 +10,7 @@ import { readReceiptsJobQueue } from './readReceiptsJobQueue';
import { readSyncJobQueue } from './readSyncJobQueue';
import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue';
import { reportSpamJobQueue } from './reportSpamJobQueue';
import { singleProtoJobQueue } from './singleProtoJobQueue';
import { viewSyncJobQueue } from './viewSyncJobQueue';
import { viewedReceiptsJobQueue } from './viewedReceiptsJobQueue';
@ -30,6 +31,7 @@ export function initializeAllJobQueues({
readSyncJobQueue.streamJobs();
removeStorageKeyJobQueue.streamJobs();
reportSpamJobQueue.streamJobs();
singleProtoJobQueue.streamJobs();
viewSyncJobQueue.streamJobs();
viewedReceiptsJobQueue.streamJobs();
}

View File

@ -0,0 +1,113 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue';
import * as Bytes from '../Bytes';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import type { ParsedJob } from './types';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { DAY } from '../util/durations';
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
import { SignalService as Proto } from '../protobuf';
import { handleMessageSend } from '../util/handleMessageSend';
import { getSendOptions } from '../util/getSendOptions';
import type { SingleProtoJobData } from '../textsecure/SendMessage';
import { singleProtoJobDataSchema } from '../textsecure/SendMessage';
import { handleMultipleSendErrors } from './helpers/handleMultipleSendErrors';
import { SendMessageProtoError } from '../textsecure/Errors';
const MAX_RETRY_TIME = DAY;
const MAX_PARALLEL_JOBS = 5;
const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS });
protected override getInMemoryQueue(
_parsedJob: ParsedJob<SingleProtoJobData>
): PQueue {
return this.parallelQueue;
}
protected parseData(data: unknown): SingleProtoJobData {
return singleProtoJobDataSchema.parse(data);
}
protected async run(
{
data,
timestamp,
}: Readonly<{ data: SingleProtoJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
const shouldContinue = await commonShouldJobContinue({
attempt,
log,
timeRemaining,
});
if (!shouldContinue) {
return;
}
const {
contentHint,
identifier,
isSyncMessage,
messageIds = [],
protoBase64,
type,
} = data;
log.info(
`starting ${type} send to ${identifier} with timestamp ${timestamp}`
);
const conversation = window.ConversationController.get(identifier);
if (!conversation) {
throw new Error(
`Failed to get conversation for identifier ${identifier}`
);
}
const proto = Proto.Content.decode(Bytes.fromBase64(protoBase64));
const options = await getSendOptions(conversation.attributes, {
syncMessage: isSyncMessage,
});
try {
await handleMessageSend(
window.textsecure.messaging.sendIndividualProto({
contentHint,
identifier,
options,
proto,
timestamp,
}),
{ messageIds, sendType: type }
);
} catch (error: unknown) {
const errors =
error instanceof SendMessageProtoError
? error.errors || [error]
: [error];
await handleMultipleSendErrors({
errors,
isFinalAttempt,
log,
timeRemaining,
});
}
}
}
export const singleProtoJobQueue = new SingleProtoJobQueue({
maxAttempts: MAX_ATTEMPTS,
queueType: 'single proto',
store: jobQueueDatabaseStore,
});

View File

@ -106,6 +106,7 @@ import * as log from '../logging/log';
import * as Errors from '../types/errors';
import { isMessageUnread } from '../util/isMessageUnread';
import type { SenderKeyTargetType } from '../util/sendToGroup';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
/* eslint-disable more/no-then */
window.Whisper = window.Whisper || {};
@ -2401,12 +2402,6 @@ export class ConversationModel extends window.Backbone
// server updates were successful.
await this.applyMessageRequestResponse(response);
const ourConversation =
window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
const groupId = this.getGroupIdBuffer();
if (window.ConversationController.areWePrimaryDevice()) {
@ -2417,20 +2412,19 @@ export class ConversationModel extends window.Backbone
}
try {
await handleMessageSend(
window.textsecure.messaging.syncMessageRequestResponse(
{
threadE164: this.get('e164'),
threadUuid: this.get('uuid'),
groupId,
type: response,
},
sendOptions
),
{ messageIds: [], sendType: 'otherSync' }
await singleProtoJobQueue.add(
window.textsecure.messaging.getMessageRequestResponseSync({
threadE164: this.get('e164'),
threadUuid: this.get('uuid'),
groupId,
type: response,
})
);
} catch (error) {
log.error(
'syncMessageRequestResponse: Failed to queue sync message',
Errors.toLogFormat(error)
);
} catch (result) {
this.processSendResponse(result);
}
}
@ -2619,17 +2613,6 @@ export class ConversationModel extends window.Backbone
return;
}
// Because syncVerification sends a (null) message to the target of the verify and
// a sync message to our own devices, we need to send the accessKeys down for both
// contacts. So we merge their sendOptions.
const ourConversation =
window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
const contactSendOptions = await getSendOptions(this.attributes);
const options = { ...sendOptions, ...contactSendOptions };
const key = await window.textsecure.storage.protocol.loadIdentityKey(
UUID.checkedLookup(identifier)
);
@ -2639,16 +2622,21 @@ export class ConversationModel extends window.Backbone
);
}
await handleMessageSend(
window.textsecure.messaging.syncVerification(
e164,
uuid.toString(),
state,
key,
options
),
{ messageIds: [], sendType: 'verificationSync' }
);
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getVerificationSync(
e164,
uuid.toString(),
state,
key
)
);
} catch (error) {
log.error(
'sendVerifySyncMessage: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}
isVerified(): boolean {

View File

@ -1,4 +1,4 @@
// Copyright 2020-2021 Signal Messenger, LLC
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { debounce, isNumber } from 'lodash';
@ -27,7 +27,6 @@ import type { ConversationModel } from '../models/conversations';
import { strictAssert } from '../util/assert';
import * as durations from '../util/durations';
import { BackOff } from '../util/BackOff';
import { handleMessageSend } from '../util/handleMessageSend';
import { storageJobQueue } from '../util/JobQueue';
import { sleep } from '../util/sleep';
import { isMoreRecentThan } from '../util/timestamp';
@ -39,6 +38,8 @@ import {
} from '../util/whatTypeOfConversation';
import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
import * as Errors from '../types/errors';
type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier;
@ -553,15 +554,21 @@ async function uploadManifest(
if (window.ConversationController.areWePrimaryDevice()) {
log.warn(
'uploadManifest: We are primary device; not sending sync manifest'
'storageService.uploadManifest: We are primary device; not sending sync manifest'
);
return;
}
await handleMessageSend(
window.textsecure.messaging.sendFetchManifestSyncMessage(),
{ messageIds: [], sendType: 'otherSync' }
);
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getFetchManifestSyncMessage()
);
} catch (error) {
log.error(
'storageService.uploadManifest: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}
async function stopStorageServiceSync() {
@ -578,7 +585,7 @@ async function stopStorageServiceSync() {
await sleep(backOff.getAndIncrement());
log.info('storageService.stopStorageServiceSync: requesting new keys');
setTimeout(() => {
setTimeout(async () => {
if (!window.textsecure.messaging) {
throw new Error('storageService.stopStorageServiceSync: We are offline!');
}
@ -589,11 +596,16 @@ async function stopStorageServiceSync() {
);
return;
}
handleMessageSend(window.textsecure.messaging.sendRequestKeySyncMessage(), {
messageIds: [],
sendType: 'otherSync',
});
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getRequestKeySyncMessage()
);
} catch (error) {
log.error(
'storageService.stopStorageServiceSync: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
});
}
@ -1118,14 +1130,23 @@ async function upload(fromSync = false): Promise<void> {
backOff.reset();
if (window.ConversationController.areWePrimaryDevice()) {
log.warn('upload: We are primary device; not sending key sync request');
log.warn(
'storageService.upload: We are primary device; not sending key sync request'
);
return;
}
await handleMessageSend(
window.textsecure.messaging.sendRequestKeySyncMessage(),
{ messageIds: [], sendType: 'otherSync' }
);
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getRequestKeySyncMessage()
);
} catch (error) {
log.error(
'storageService.upload: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
return;
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import dataInterface from '../sql/Client';
@ -6,7 +6,9 @@ import type { ConversationType } from '../state/ducks/conversations';
import { computeHash } from '../Crypto';
import { encryptProfileData } from '../util/encryptProfileData';
import { getProfile } from '../util/getProfile';
import { handleMessageSend } from '../util/handleMessageSend';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
import * as Errors from '../types/errors';
import * as log from '../logging/log';
export async function writeProfile(
conversation: ConversationType,
@ -85,8 +87,14 @@ export async function writeProfile(
dataInterface.updateConversation(model.attributes);
model.captureChange('writeProfile');
await handleMessageSend(
window.textsecure.messaging.sendFetchLocalProfileSyncMessage(),
{ messageIds: [], sendType: 'otherSync' }
);
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getFetchLocalProfileSyncMessage()
);
} catch (error) {
log.error(
'writeProfile: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}

View File

@ -1,9 +1,11 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
import dataInterface from '../sql/Client';
import { handleMessageSend } from '../util/handleMessageSend';
import { updateOurUsername } from '../util/updateOurUsername';
import * as Errors from '../types/errors';
import * as log from '../logging/log';
export async function writeUsername({
username,
@ -32,8 +34,14 @@ export async function writeUsername({
dataInterface.updateConversation(me.attributes);
await handleMessageSend(
window.textsecure.messaging.sendFetchLocalProfileSyncMessage(),
{ messageIds: [], sendType: 'otherSync' }
);
try {
await singleProtoJobQueue.add(
window.textsecure.messaging.getFetchLocalProfileSyncMessage()
);
} catch (error) {
log.error(
'writeUsername: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}

View File

@ -1,20 +1,16 @@
// Copyright 2019-2021 Signal Messenger, LLC
// Copyright 2019-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { handleMessageSend } from '../util/handleMessageSend';
import { getSendOptions } from '../util/getSendOptions';
import * as log from '../logging/log';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
import * as Errors from '../types/errors';
export async function sendStickerPackSync(
packId: string,
packKey: string,
installed: boolean
): Promise<void> {
const { ConversationController, textsecure } = window;
const ourConversation = ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
const { textsecure } = window;
if (!textsecure.messaging) {
log.error(
@ -31,22 +27,20 @@ export async function sendStickerPackSync(
return;
}
handleMessageSend(
textsecure.messaging.sendStickerPackSync(
[
try {
await singleProtoJobQueue.add(
textsecure.messaging.getStickerPackSync([
{
packId,
packKey,
installed,
},
],
sendOptions
),
{ messageIds: [], sendType: 'otherSync' }
).catch(error => {
log.error(
'shim: Error calling sendStickerPackSync:',
error && error.stack ? error.stack : error
])
);
});
} catch (error) {
log.error(
'sendStickerPackSync: Failed to queue sync message',
Errors.toLogFormat(error)
);
}
}

View File

@ -4096,7 +4096,6 @@ async function removeAll(): Promise<void> {
DELETE FROM identityKeys;
DELETE FROM items;
DELETE FROM jobs;
DELETE FROM jobs;
DELETE FROM messages_fts;
DELETE FROM messages;
DELETE FROM preKeys;

View File

@ -3,7 +3,7 @@
import { assert } from 'chai';
import * as sinon from 'sinon';
import { noop } from 'lodash';
import { noop, omit } from 'lodash';
import { HTTPError } from '../../../textsecure/Errors';
import { SECOND } from '../../../util/durations';
@ -63,6 +63,17 @@ describe('handleMultipleSendErrors', () => {
sinon.assert.calledOnceWithExactly(markFailed);
});
it("doesn't require `markFailed`", async () => {
await assert.isRejected(
handleMultipleSendErrors({
...omit(defaultOptions, 'markFailed'),
errors: [new Error('Test message')],
isFinalAttempt: true,
}),
'Test message'
);
});
describe('413 handling', () => {
it('sleeps for the longest 413 Retry-After time', async () => {
let done = false;

View File

@ -1,4 +1,4 @@
// Copyright 2020-2021 Signal Messenger, LLC
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable guard-for-in */
@ -53,12 +53,10 @@ export type SendLogCallbackType = (options: {
deviceIds: Array<number>;
}) => Promise<void>;
export const serializedCertificateSchema = z
.object({
expires: z.number().optional(),
serialized: z.instanceof(Uint8Array),
})
.nonstrict();
export const serializedCertificateSchema = z.object({
expires: z.number().optional(),
serialized: z.instanceof(Uint8Array),
});
export type SerializedCertificateType = z.infer<
typeof serializedCertificateSchema

View File

@ -1,4 +1,4 @@
// Copyright 2020-2021 Signal Messenger, LLC
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-nested-ternary */
@ -6,6 +6,7 @@
/* eslint-disable no-bitwise */
/* eslint-disable max-classes-per-file */
import { z } from 'zod';
import type { Dictionary } from 'lodash';
import Long from 'long';
import PQueue from 'p-queue';
@ -62,7 +63,7 @@ import type {
} from '../linkPreviews/linkPreviewFetch';
import { concat, isEmpty, map } from '../util/iterables';
import type { SendTypesType } from '../util/handleMessageSend';
import { handleMessageSend, shouldSaveProto } from '../util/handleMessageSend';
import { shouldSaveProto, sendTypesEnum } from '../util/handleMessageSend';
import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
@ -139,6 +140,17 @@ export type AttachmentType = {
blurHash?: string;
};
export const singleProtoJobDataSchema = z.object({
contentHint: z.number(),
identifier: z.string(),
isSyncMessage: z.boolean(),
messageIds: z.array(z.string()).optional(),
protoBase64: z.string(),
type: sendTypesEnum,
});
export type SingleProtoJobData = z.infer<typeof singleProtoJobDataSchema>;
function makeAttachmentSendReady(
attachment: Attachment.AttachmentType
): AttachmentType | undefined {
@ -956,17 +968,17 @@ export default class MessageSender {
}
async sendIndividualProto({
contentHint,
identifier,
options,
proto,
timestamp,
contentHint,
options,
}: Readonly<{
contentHint: number;
identifier: string | undefined;
options?: SendOptionsType;
proto: Proto.DataMessage | Proto.Content | PlaintextContent;
timestamp: number;
contentHint: number;
options?: SendOptionsType;
}>): Promise<CallbackResultType> {
assert(identifier, "Identifier can't be undefined");
return new Promise((resolve, reject) => {
@ -1087,7 +1099,7 @@ export default class MessageSender {
sentMessage.isRecipientUpdate = true;
}
// Though this field has 'unidenified' in the name, it should have entries for each
// Though this field has 'unidentified' in the name, it should have entries for each
// number we sent to.
if (!isEmpty(conversationIdsSentTo)) {
sentMessage.unidentifiedStatus = [
@ -1128,9 +1140,7 @@ export default class MessageSender {
});
}
async sendRequestBlockSyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getRequestBlockSyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const request = new Proto.SyncMessage.Request();
@ -1142,18 +1152,18 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'blockSyncRequest',
};
}
async sendRequestConfigurationSyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getRequestConfigurationSyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const request = new Proto.SyncMessage.Request();
@ -1165,18 +1175,18 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'configurationSyncRequest',
};
}
async sendRequestGroupSyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getRequestGroupSyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const request = new Proto.SyncMessage.Request();
@ -1188,18 +1198,18 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'groupSyncRequest',
};
}
async sendRequestContactSyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getRequestContactSyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const request = new Proto.SyncMessage.Request();
@ -1211,18 +1221,18 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'contactSyncRequest',
};
}
async sendFetchManifestSyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getFetchManifestSyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const fetchLatest = new Proto.SyncMessage.FetchLatest();
@ -1235,18 +1245,18 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'fetchLatestManifestSync',
};
}
async sendFetchLocalProfileSyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getFetchLocalProfileSyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const fetchLatest = new Proto.SyncMessage.FetchLatest();
@ -1259,18 +1269,18 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'fetchLocalProfileSync',
};
}
async sendRequestKeySyncMessage(
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getRequestKeySyncMessage(): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const request = new Proto.SyncMessage.Request();
@ -1283,13 +1293,15 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'keySyncRequest',
};
}
async syncReadMessages(
@ -1381,30 +1393,29 @@ export default class MessageSender {
});
}
async syncMessageRequestResponse(
responseArgs: Readonly<{
getMessageRequestResponseSync(
options: Readonly<{
threadE164?: string;
threadUuid?: string;
groupId?: Uint8Array;
type: number;
}>,
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
}>
): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const syncMessage = this.createSyncMessage();
const response = new Proto.SyncMessage.MessageRequestResponse();
if (responseArgs.threadE164 !== undefined) {
response.threadE164 = responseArgs.threadE164;
if (options.threadE164 !== undefined) {
response.threadE164 = options.threadE164;
}
if (responseArgs.threadUuid !== undefined) {
response.threadUuid = responseArgs.threadUuid;
if (options.threadUuid !== undefined) {
response.threadUuid = options.threadUuid;
}
if (responseArgs.groupId) {
response.groupId = responseArgs.groupId;
if (options.groupId) {
response.groupId = options.groupId;
}
response.type = responseArgs.type;
response.type = options.type;
syncMessage.messageRequestResponse = response;
const contentMessage = new Proto.Content();
@ -1412,23 +1423,24 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
return {
contentHint: ContentHint.RESENDABLE,
options,
});
identifier: myUuid.toString(),
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'messageRequestSync',
};
}
async sendStickerPackSync(
getStickerPackSync(
operations: ReadonlyArray<{
packId: string;
packKey: string;
installed: boolean;
}>,
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
}>
): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const ENUM = Proto.SyncMessage.StickerPackOperation.Type;
@ -1451,44 +1463,31 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier: myUuid.toString(),
proto: contentMessage,
timestamp: Date.now(),
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'stickerPackSync',
};
}
async syncVerification(
getVerificationSync(
destinationE164: string | undefined,
destinationUuid: string | undefined,
state: number,
identityKey: Readonly<Uint8Array>,
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
identityKey: Readonly<Uint8Array>
): SingleProtoJobData {
const myUuid = window.textsecure.storage.user.getCheckedUuid();
const now = Date.now();
if (!destinationE164 && !destinationUuid) {
throw new Error('syncVerification: Neither e164 nor UUID were provided');
}
// Get padding which we can share between null message and verified sync
const padding = this.getRandomPadding();
// First send a null message to mask the sync message.
await handleMessageSend(
this.sendNullMessage(
{ uuid: destinationUuid, e164: destinationE164, padding },
options
),
{
messageIds: [],
sendType: 'nullMessage',
}
);
const verified = new Proto.Verified();
verified.state = state;
if (destinationE164) {
@ -1503,18 +1502,20 @@ export default class MessageSender {
const syncMessage = this.createSyncMessage();
syncMessage.verified = verified;
const secondMessage = new Proto.Content();
secondMessage.syncMessage = syncMessage;
const contentMessage = new Proto.Content();
contentMessage.syncMessage = syncMessage;
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendIndividualProto({
identifier: myUuid.toString(),
proto: secondMessage,
timestamp: now,
return {
contentHint: ContentHint.RESENDABLE,
options,
});
identifier: myUuid.toString(),
isSyncMessage: true,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'verificationSync',
};
}
// Sending messages to contacts
@ -1542,7 +1543,7 @@ export default class MessageSender {
}
: {}),
},
contentHint: ContentHint.IMPLICIT,
contentHint: ContentHint.RESENDABLE,
groupId: undefined,
options,
});
@ -1650,14 +1651,15 @@ export default class MessageSender {
});
}
async sendNullMessage(
{
uuid,
e164,
padding,
}: Readonly<{ uuid?: string; e164?: string; padding?: Uint8Array }>,
options?: Readonly<SendOptionsType>
): Promise<CallbackResultType> {
getNullMessage({
uuid,
e164,
padding,
}: Readonly<{
uuid?: string;
e164?: string;
padding?: Uint8Array;
}>): SingleProtoJobData {
const nullMessage = new Proto.NullMessage();
const identifier = uuid || e164;
@ -1672,15 +1674,15 @@ export default class MessageSender {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
// We want the NullMessage to look like a normal outgoing message
const timestamp = Date.now();
return this.sendIndividualProto({
return {
contentHint: ContentHint.RESENDABLE,
identifier,
proto: contentMessage,
timestamp,
contentHint: ContentHint.IMPLICIT,
options,
});
isSyncMessage: false,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'nullMessage',
};
}
async sendExpirationTimerUpdateToIdentifier(

View File

@ -1,4 +1,4 @@
// Copyright 2020-2021 Signal Messenger, LLC
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable more/no-then */
@ -12,9 +12,9 @@ import MessageReceiver from './MessageReceiver';
import type { ContactSyncEvent, GroupSyncEvent } from './messageReceiverEvents';
import MessageSender from './SendMessage';
import { assert } from '../util/assert';
import { getSendOptions } from '../util/getSendOptions';
import { handleMessageSend } from '../util/handleMessageSend';
import * as log from '../logging/log';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
import * as Errors from '../types/errors';
class SyncRequestInner extends EventTarget {
private started = false;
@ -65,47 +65,28 @@ class SyncRequestInner extends EventTarget {
const { sender } = this;
const ourConversation =
window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
if (window.ConversationController.areWePrimaryDevice()) {
log.warn('SyncRequest.start: We are primary device; returning early');
return;
}
log.info('SyncRequest created. Sending config sync request...');
handleMessageSend(sender.sendRequestConfigurationSyncMessage(sendOptions), {
messageIds: [],
sendType: 'otherSync',
});
log.info(
'SyncRequest created. Sending config, block, contact, and group requests...'
);
try {
await Promise.all([
singleProtoJobQueue.add(sender.getRequestConfigurationSyncMessage()),
singleProtoJobQueue.add(sender.getRequestBlockSyncMessage()),
singleProtoJobQueue.add(sender.getRequestContactSyncMessage()),
singleProtoJobQueue.add(sender.getRequestGroupSyncMessage()),
]);
} catch (error: unknown) {
log.error(
'SyncRequest: Failed to add request jobs',
Errors.toLogFormat(error)
);
}
log.info('SyncRequest now sending block sync request...');
handleMessageSend(sender.sendRequestBlockSyncMessage(sendOptions), {
messageIds: [],
sendType: 'otherSync',
});
log.info('SyncRequest now sending contact sync message...');
handleMessageSend(sender.sendRequestContactSyncMessage(sendOptions), {
messageIds: [],
sendType: 'otherSync',
})
.then(() => {
log.info('SyncRequest now sending group sync message...');
return handleMessageSend(
sender.sendRequestGroupSyncMessage(sendOptions),
{ messageIds: [], sendType: 'otherSync' }
);
})
.catch((error: Error) => {
log.error(
'SyncRequest error:',
error && error.stack ? error.stack : error
);
});
this.timeout = setTimeout(this.onTimeout.bind(this), this.timeoutMillis);
}

View File

@ -1,6 +1,7 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { z } from 'zod';
import { isNumber } from 'lodash';
import type { CallbackResultType } from '../textsecure/Types.d';
import dataInterface from '../sql/Client';
@ -15,40 +16,47 @@ export const SEALED_SENDER = {
UNRESTRICTED: 3,
};
export type SendTypesType =
| 'callingMessage' // excluded from send log
| 'deleteForEveryone'
| 'deliveryReceipt'
| 'expirationTimerUpdate'
| 'groupChange'
| 'legacyGroupChange'
| 'message'
| 'nullMessage' // excluded from send log
| 'otherSync'
| 'profileKeyUpdate'
| 'reaction'
| 'readReceipt'
| 'readSync'
| 'resendFromLog' // excluded from send log
| 'resetSession'
| 'retryRequest' // excluded from send log
| 'senderKeyDistributionMessage'
| 'sentSync'
| 'typing' // excluded from send log
| 'verificationSync'
| 'viewOnceSync'
| 'viewSync'
| 'viewedReceipt';
export const sendTypesEnum = z.enum([
'blockSyncRequest',
'callingMessage', // excluded from send log
'configurationSyncRequest',
'contactSyncRequest',
'deleteForEveryone',
'deliveryReceipt',
'expirationTimerUpdate',
'fetchLatestManifestSync',
'fetchLocalProfileSync',
'groupChange',
'groupSyncRequest',
'keySyncRequest',
'legacyGroupChange',
'message',
'messageRequestSync',
'nullMessage',
'profileKeyUpdate',
'reaction',
'readReceipt',
'readSync',
'resendFromLog', // excluded from send log
'resetSession',
'retryRequest', // excluded from send log
'senderKeyDistributionMessage',
'sentSync',
'stickerPackSync',
'typing', // excluded from send log
'verificationSync',
'viewOnceSync',
'viewSync',
'viewedReceipt',
]);
export type SendTypesType = z.infer<typeof sendTypesEnum>;
export function shouldSaveProto(sendType: SendTypesType): boolean {
if (sendType === 'callingMessage') {
return false;
}
if (sendType === 'nullMessage') {
return false;
}
if (sendType === 'resendFromLog') {
return false;
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Signal Messenger, LLC
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import {
@ -19,6 +19,7 @@ import { Address } from '../types/Address';
import { QualifiedAddress } from '../types/QualifiedAddress';
import { ToastDecryptionError } from '../components/ToastDecryptionError';
import { showToast } from './showToast';
import * as Errors from '../types/errors';
import type { ConversationModel } from '../models/conversations';
import type {
@ -30,6 +31,7 @@ import type {
import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue';
const RETRY_LIMIT = 5;
@ -311,22 +313,17 @@ async function sendDistributionMessageOrNullMessage(
`sendDistributionMessageOrNullMessage/${logId}: Did not send distribution message, sending null message`
);
// Enqueue a null message using the newly-created session
try {
const sendOptions = await getSendOptions(conversation.attributes);
const result = await handleMessageSend(
window.textsecure.messaging.sendNullMessage(
{ uuid: requesterUuid },
sendOptions
),
{ messageIds: [], sendType: 'nullMessage' }
await singleProtoJobQueue.add(
window.textsecure.messaging.getNullMessage({
uuid: requesterUuid,
})
);
if (result && result.errors && result.errors.length > 0) {
throw result.errors[0];
}
} catch (error) {
log.error(
`maybeSendDistributionMessage/${logId}: Failed to send null message`,
error && error.stack ? error.stack : error
'sendDistributionMessageOrNullMessage: Failed to queue null message',
Errors.toLogFormat(error)
);
}
}