Introduce new Profile Fetch service, with centralized queueing

This commit is contained in:
Scott Nonnenberg 2022-07-13 17:46:46 -07:00 committed by GitHub
parent 535b466b43
commit bfc56dd175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 821 additions and 548 deletions

View File

@ -13,7 +13,6 @@ import Long from 'long';
import type { ClientZkGroupCipher } from '@signalapp/libsignal-client/zkgroup';
import { v4 as getGuid } from 'uuid';
import LRU from 'lru-cache';
import PQueue from 'p-queue';
import * as log from './logging/log';
import {
getCheckedCredentialsForToday,
@ -1472,10 +1471,6 @@ export async function modifyGroupV2({
let refreshedCredentials = false;
const profileFetchQueue = new PQueue({
concurrency: 3,
});
for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt += 1) {
log.info(`modifyGroupV2/${logId}: Starting attempt ${attempt}`);
try {
@ -1504,8 +1499,8 @@ export async function modifyGroupV2({
}
// eslint-disable-next-line no-await-in-loop
await profileFetchQueue.addAll(
membersMissingCredentials.map(member => () => member.getProfiles())
await Promise.all(
membersMissingCredentials.map(member => member.getProfiles())
);
}
@ -1603,8 +1598,8 @@ export async function modifyGroupV2({
}
// eslint-disable-next-line no-await-in-loop
await profileFetchQueue.addAll(
usingCredentialsFrom.map(member => () => member.getProfiles())
await Promise.all(
usingCredentialsFrom.map(member => member.getProfiles())
);
// Fetch credentials only once
@ -3088,14 +3083,8 @@ async function updateGroup(
`${contactsWithoutProfileKey.length} missing profiles`
);
const profileFetchQueue = new PQueue({
concurrency: 3,
});
profileFetches = profileFetchQueue.addAll(
contactsWithoutProfileKey.map(contact => () => {
const active = contact.getActiveProfileFetch();
return active || contact.getProfiles();
})
profileFetches = Promise.all(
contactsWithoutProfileKey.map(contact => contact.getProfiles())
);
}

14
ts/model-types.d.ts vendored
View File

@ -277,12 +277,12 @@ export type ConversationAttributesType = {
draftBodyRanges?: Array<BodyRangeType>;
draftTimestamp?: number | null;
hideStory?: boolean;
inbox_position: number;
isPinned: boolean;
lastMessageDeletedForEveryone: boolean;
inbox_position?: number;
isPinned?: boolean;
lastMessageDeletedForEveryone?: boolean;
lastMessageStatus?: LastMessageStatus | null;
markedUnread: boolean;
messageCount: number;
markedUnread?: boolean;
messageCount?: number;
messageCountBeforeMessageRequests?: number | null;
messageRequestResponseType?: number;
muteExpiresAt?: number;
@ -297,7 +297,7 @@ export type ConversationAttributesType = {
lastProfile?: ConversationLastProfileType;
quotedMessageId?: string | null;
sealedSender?: unknown;
sentMessageCount: number;
sentMessageCount?: number;
sharedGroupNames?: Array<string>;
id: string;
@ -313,7 +313,7 @@ export type ConversationAttributesType = {
name?: string;
needsStorageServiceSync?: boolean;
needsVerification?: boolean;
profileSharing: boolean;
profileSharing?: boolean;
storageID?: string;
storageVersion?: number;
storageUnknownFields?: string;

View File

@ -3,7 +3,6 @@
import { compact, has, isNumber, throttle, debounce } from 'lodash';
import { batch as batchDispatch } from 'react-redux';
import PQueue from 'p-queue';
import { v4 as generateGuid } from 'uuid';
import type {
@ -227,8 +226,6 @@ export class ConversationModel extends window.Backbone
private isInReduxBatch = false;
private _activeProfileFetch?: Promise<void>;
override defaults(): Partial<ConversationAttributesType> {
return {
unreadCount: 0,
@ -4016,7 +4013,7 @@ export class ConversationModel extends window.Backbone
// with them?
isFromOrAddedByTrustedContact(): boolean {
if (isDirectConversation(this.attributes)) {
return Boolean(this.get('name')) || this.get('profileSharing');
return Boolean(this.get('name')) || Boolean(this.get('profileSharing'));
}
const addedBy = this.get('addedBy');
@ -4583,32 +4580,11 @@ export class ConversationModel extends window.Backbone
const conversations =
this.getMembers() as unknown as Array<ConversationModel>;
const queue = new PQueue({
concurrency: 3,
});
// Convert Promise<void[]> that is returned by addAll() to Promise<void>
const promise = (async () => {
await queue.addAll(
conversations.map(
conversation => () =>
getProfile(conversation.get('uuid'), conversation.get('e164'))
)
);
})();
this._activeProfileFetch = promise;
try {
await promise;
} finally {
if (this._activeProfileFetch === promise) {
this._activeProfileFetch = undefined;
}
}
}
getActiveProfileFetch(): Promise<void> | undefined {
return this._activeProfileFetch;
await Promise.all(
conversations.map(conversation =>
getProfile(conversation.get('uuid'), conversation.get('e164'))
)
);
}
async setEncryptedProfileName(

655
ts/services/profiles.ts Normal file
View File

@ -0,0 +1,655 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type {
ProfileKeyCredentialRequestContext,
ClientZkProfileOperations,
} from '@signalapp/libsignal-client/zkgroup';
import PQueue from 'p-queue';
import type { ConversationModel } from '../models/conversations';
import type {
GetProfileOptionsType,
GetProfileUnauthOptionsType,
} from '../textsecure/WebAPI';
import * as log from '../logging/log';
import * as Errors from '../types/errors';
import * as Bytes from '../Bytes';
import { explodePromise } from '../util/explodePromise';
import { isRecord } from '../util/isRecord';
import { sleep } from '../util/sleep';
import { MINUTE, SECOND } from '../util/durations';
import {
generateProfileKeyCredentialRequest,
generatePNICredentialRequest,
getClientZkProfileOperations,
handleProfileKeyCredential,
handleProfileKeyPNICredential,
} from '../util/zkgroup';
import { isMe } from '../util/whatTypeOfConversation';
import { getUserLanguages } from '../util/userLanguages';
import { parseBadgesFromServer } from '../badges/parseBadgesFromServer';
import { strictAssert } from '../util/assert';
import { findRetryAfterTimeFromError } from '../jobs/helpers/findRetryAfterTimeFromError';
import { SEALED_SENDER } from '../types/SealedSender';
import { HTTPError } from '../textsecure/Errors';
import { Address } from '../types/Address';
import { QualifiedAddress } from '../types/QualifiedAddress';
import { UUIDKind } from '../types/UUID';
import { trimForDisplay, verifyAccessKey, decryptProfile } from '../Crypto';
type JobType = {
resolve: () => void;
reject: (error: Error) => void;
promise: Promise<void>;
startTime: number;
};
// Goals for this service:
// 1. Ensure that when we get a 413 from the server, we stop firing off profile
// fetches for a while.
// 2. Ensure that all existing profile fetches don't hang in this case; to solve this we
// cancel all outstanding requests when we hit a 413, and throw instead of queueing
// something new if we're waiting due to a retry-after. Note: It's no worse than what
// we were doing before, failing all requests and pushing the retry-after time out
// further.
// 3. Require no changes to callers.
// Potential future goals for this problem area:
// - Update all getProfiles() callers; make them resilient to longer delays
// - Keep track of last profile fetch per conversation, reduce unnecessary re-fetches
// - Enforce a maximum profile fetch frequency
// - Don't even attempt jobs when offline
export class ProfileService {
private jobQueue: PQueue;
private jobsByConversationId: Map<string, JobType> = new Map();
private isPaused = false;
constructor(private fetchProfile = doGetProfile) {
this.jobQueue = new PQueue({ concurrency: 3, timeout: MINUTE * 2 });
this.jobsByConversationId = new Map();
log.info('Profile Service initialized');
}
public async get(conversationId: string): Promise<void> {
const preCheckConversation =
window.ConversationController.get(conversationId);
if (!preCheckConversation) {
throw new Error(
`ProfileServices.get: Pre-check conversation ${conversationId} not found`
);
}
if (this.isPaused) {
throw new Error(
`ProfileService.get: Cannot add job to paused queue for conversation ${preCheckConversation.idForLogging()}`
);
}
const existing = this.jobsByConversationId.get(conversationId);
if (existing) {
return existing.promise;
}
const { resolve, reject, promise } = explodePromise<void>();
const jobData = {
promise,
resolve,
reject,
startTime: Date.now(),
};
const job = async () => {
const conversation = window.ConversationController.get(conversationId);
if (!conversation) {
throw new Error(
`ProfileServices.get: Conversation ${conversationId} not found`
);
}
try {
await this.fetchProfile(conversation);
resolve();
} catch (error) {
reject(error);
if (this.isPaused) {
return;
}
if (isRecord(error) && 'code' in error && error.code === 413) {
this.clearAll('got 413 from server');
const time = findRetryAfterTimeFromError(error);
this.pause(time);
}
} finally {
this.jobsByConversationId.delete(conversationId);
const now = Date.now();
const delta = now - jobData.startTime;
if (delta > 30 * SECOND) {
log.warn(
`ProfileServices.get: Job for ${conversation.idForLogging()} finished ${delta}ms after queue`
);
}
}
};
this.jobsByConversationId.set(conversationId, jobData);
this.jobQueue.add(job);
return promise;
}
public clearAll(reason: string): void {
if (this.isPaused) {
log.warn(
`ProfileService.clearAll: Already paused; not clearing; reason: '${reason}'`
);
return;
}
log.info(`ProfileService.clearAll: Clearing; reason: '${reason}'`);
try {
this.isPaused = true;
this.jobQueue.pause();
this.jobsByConversationId.forEach(job => {
job.reject(
new Error(
`ProfileService.clearAll: job cancelled because '${reason}'`
)
);
});
this.jobsByConversationId.clear();
this.jobQueue.clear();
this.jobQueue.start();
} finally {
this.isPaused = false;
log.info('ProfileService.clearAll: Done clearing');
}
}
public async pause(timeInMS: number): Promise<void> {
if (this.isPaused) {
log.warn('ProfileService.pause: Already paused, not pausing again.');
return;
}
log.info(`ProfileService.pause: Pausing queue for ${timeInMS}ms`);
this.isPaused = true;
this.jobQueue.pause();
try {
await sleep(timeInMS);
} finally {
log.info('ProfileService.pause: Restarting queue');
this.jobQueue.start();
this.isPaused = false;
}
}
}
export const profileService = new ProfileService();
async function doGetProfile(c: ConversationModel): Promise<void> {
const idForLogging = c.idForLogging();
const { messaging } = window.textsecure;
strictAssert(
messaging,
'getProfile: window.textsecure.messaging not available'
);
const { updatesUrl } = window.SignalContext.config;
strictAssert(
typeof updatesUrl === 'string',
'getProfile: expected updatesUrl to be a defined string'
);
const clientZkProfileCipher = getClientZkProfileOperations(
window.getServerPublicParams()
);
const userLanguages = getUserLanguages(
navigator.languages,
window.getLocale()
);
let profile;
c.deriveAccessKeyIfNeeded();
const profileKey = c.get('profileKey');
const profileKeyVersion = c.deriveProfileKeyVersion();
const uuid = c.getCheckedUuid('getProfile');
const existingProfileKeyCredential = c.get('profileKeyCredential');
const lastProfile = c.get('lastProfile');
let profileCredentialRequestContext:
| undefined
| ProfileKeyCredentialRequestContext;
let getProfileOptions: GetProfileOptionsType | GetProfileUnauthOptionsType;
let accessKey = c.get('accessKey');
if (profileKey) {
strictAssert(
profileKeyVersion && accessKey,
'profileKeyVersion and accessKey are derived from profileKey'
);
if (existingProfileKeyCredential) {
getProfileOptions = {
accessKey,
profileKeyVersion,
userLanguages,
};
} else {
log.info(
'getProfile: generating profile key credential request for ' +
`conversation ${idForLogging}`
);
let profileKeyCredentialRequestHex: undefined | string;
({
requestHex: profileKeyCredentialRequestHex,
context: profileCredentialRequestContext,
} = generateProfileKeyCredentialRequest(
clientZkProfileCipher,
uuid.toString(),
profileKey
));
getProfileOptions = {
accessKey,
userLanguages,
profileKeyVersion,
profileKeyCredentialRequest: profileKeyCredentialRequestHex,
};
}
} else {
strictAssert(
!accessKey,
'accessKey have to be absent because there is no profileKey'
);
if (lastProfile?.profileKeyVersion) {
getProfileOptions = {
userLanguages,
profileKeyVersion: lastProfile.profileKeyVersion,
};
} else {
getProfileOptions = { userLanguages };
}
}
const isVersioned = Boolean(getProfileOptions.profileKeyVersion);
log.info(
`getProfile: getting ${isVersioned ? 'versioned' : 'unversioned'} ` +
`profile for conversation ${idForLogging}`
);
try {
if (getProfileOptions.accessKey) {
try {
profile = await messaging.getProfile(uuid, getProfileOptions);
} catch (error) {
if (!(error instanceof HTTPError)) {
throw error;
}
if (error.code === 401 || error.code === 403) {
if (isMe(c.attributes)) {
throw error;
}
await c.setProfileKey(undefined);
// Retry fetch using last known profileKeyVersion or fetch
// unversioned profile.
return doGetProfile(c);
}
if (error.code === 404) {
await c.removeLastProfile(lastProfile);
}
throw error;
}
} else {
try {
// We won't get the credential, but lets either fetch:
// - a versioned profile using last known profileKeyVersion
// - some basic profile information (capabilities, badges, etc).
profile = await messaging.getProfile(uuid, getProfileOptions);
} catch (error) {
if (error instanceof HTTPError && error.code === 404) {
log.info(`getProfile: failed to find a profile for ${idForLogging}`);
await c.removeLastProfile(lastProfile);
if (!isVersioned) {
log.info(`getProfile: marking ${idForLogging} as unregistered`);
c.setUnregistered();
}
}
throw error;
}
}
if (isMe(c.attributes) && profileKey && profileKeyVersion) {
try {
await maybeGetPNICredential(c, {
clientZkProfileCipher,
profileKey,
profileKeyVersion,
userLanguages,
});
} catch (error) {
log.warn(
'getProfile failed to get our own PNI credential',
Errors.toLogFormat(error)
);
}
}
if (profile.identityKey) {
const identityKey = Bytes.fromBase64(profile.identityKey);
const changed = await window.textsecure.storage.protocol.saveIdentity(
new Address(uuid, 1),
identityKey,
false
);
if (changed) {
// save identity will close all sessions except for .1, so we
// must close that one manually.
const ourUuid = window.textsecure.storage.user.getCheckedUuid();
await window.textsecure.storage.protocol.archiveSession(
new QualifiedAddress(ourUuid, new Address(uuid, 1))
);
}
}
// Update accessKey to prevent race conditions. Since we run asynchronous
// requests above - it is possible that someone updates or erases
// the profile key from under us.
accessKey = c.get('accessKey');
if (profile.unrestrictedUnidentifiedAccess && profile.unidentifiedAccess) {
log.info(
`getProfile: setting sealedSender to UNRESTRICTED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.UNRESTRICTED,
});
} else if (accessKey && profile.unidentifiedAccess) {
const haveCorrectKey = verifyAccessKey(
Bytes.fromBase64(accessKey),
Bytes.fromBase64(profile.unidentifiedAccess)
);
if (haveCorrectKey) {
log.info(
`getProfile: setting sealedSender to ENABLED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.ENABLED,
});
} else {
log.warn(
`getProfile: setting sealedSender to DISABLED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.DISABLED,
});
}
} else {
log.info(
`getProfile: setting sealedSender to DISABLED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.DISABLED,
});
}
const rawDecryptionKey = c.get('profileKey') || lastProfile?.profileKey;
const decryptionKey = rawDecryptionKey
? Bytes.fromBase64(rawDecryptionKey)
: undefined;
if (profile.about) {
if (decryptionKey) {
const decrypted = decryptProfile(
Bytes.fromBase64(profile.about),
decryptionKey
);
c.set('about', Bytes.toString(trimForDisplay(decrypted)));
}
} else {
c.unset('about');
}
if (profile.aboutEmoji) {
if (decryptionKey) {
const decrypted = decryptProfile(
Bytes.fromBase64(profile.aboutEmoji),
decryptionKey
);
c.set('aboutEmoji', Bytes.toString(trimForDisplay(decrypted)));
}
} else {
c.unset('aboutEmoji');
}
if (profile.paymentAddress && isMe(c.attributes)) {
window.storage.put('paymentAddress', profile.paymentAddress);
}
if (profile.capabilities) {
c.set({ capabilities: profile.capabilities });
} else {
c.unset('capabilities');
}
const badges = parseBadgesFromServer(profile.badges, updatesUrl);
if (badges.length) {
await window.reduxActions.badges.updateOrCreate(badges);
c.set({
badges: badges.map(badge => ({
id: badge.id,
...('expiresAt' in badge
? {
expiresAt: badge.expiresAt,
isVisible: badge.isVisible,
}
: {}),
})),
});
} else {
c.unset('badges');
}
if (profileCredentialRequestContext) {
if (profile.credential) {
const {
credential: profileKeyCredential,
expiration: profileKeyCredentialExpiration,
} = handleProfileKeyCredential(
clientZkProfileCipher,
profileCredentialRequestContext,
profile.credential
);
c.set({ profileKeyCredential, profileKeyCredentialExpiration });
} else {
c.unset('profileKeyCredential');
}
}
} catch (error) {
if (!(error instanceof HTTPError)) {
throw error;
}
switch (error.code) {
case 401:
case 403:
if (
c.get('sealedSender') === SEALED_SENDER.ENABLED ||
c.get('sealedSender') === SEALED_SENDER.UNRESTRICTED
) {
log.warn(
`getProfile: Got 401/403 when using accessKey for ${idForLogging}, removing profileKey`
);
if (!isMe(c.attributes)) {
await c.setProfileKey(undefined);
}
}
if (c.get('sealedSender') === SEALED_SENDER.UNKNOWN) {
log.warn(
`getProfile: Got 401/403 when using accessKey for ${idForLogging}, setting sealedSender = DISABLED`
);
c.set('sealedSender', SEALED_SENDER.DISABLED);
}
return;
default:
log.warn(
'getProfile failure:',
idForLogging,
Errors.toLogFormat(error)
);
return;
}
}
const decryptionKeyString = profileKey || lastProfile?.profileKey;
const decryptionKey = decryptionKeyString
? Bytes.fromBase64(decryptionKeyString)
: undefined;
let isSuccessfullyDecrypted = true;
if (profile.name) {
if (decryptionKey) {
try {
await c.setEncryptedProfileName(profile.name, decryptionKey);
} catch (error) {
log.warn(
'getProfile decryption failure:',
idForLogging,
Errors.toLogFormat(error)
);
isSuccessfullyDecrypted = false;
await c.set({
profileName: undefined,
profileFamilyName: undefined,
});
}
}
} else {
c.set({
profileName: undefined,
profileFamilyName: undefined,
});
}
try {
if (decryptionKey) {
await c.setProfileAvatar(profile.avatar, decryptionKey);
}
} catch (error) {
if (error instanceof HTTPError) {
if (error.code === 403 || error.code === 404) {
log.warn(
`getProfile: profile avatar is missing for conversation ${idForLogging}`
);
}
} else {
log.warn(
`getProfile: failed to decrypt avatar for conversation ${idForLogging}`,
Errors.toLogFormat(error)
);
isSuccessfullyDecrypted = false;
}
}
c.set('profileLastFetchedAt', Date.now());
// After we successfully decrypted - update lastProfile property
if (
isSuccessfullyDecrypted &&
profileKey &&
getProfileOptions.profileKeyVersion
) {
await c.updateLastProfile(lastProfile, {
profileKey,
profileKeyVersion: getProfileOptions.profileKeyVersion,
});
}
window.Signal.Data.updateConversation(c.attributes);
}
async function maybeGetPNICredential(
c: ConversationModel,
{
clientZkProfileCipher,
profileKey,
profileKeyVersion,
userLanguages,
}: {
clientZkProfileCipher: ClientZkProfileOperations;
profileKey: string;
profileKeyVersion: string;
userLanguages: ReadonlyArray<string>;
}
): Promise<void> {
// Already present and up-to-date
if (c.get('pniCredential')) {
return;
}
strictAssert(isMe(c.attributes), 'Has to fetch PNI credential for ourselves');
log.info('maybeGetPNICredential: requesting PNI credential');
const { storage, messaging } = window.textsecure;
strictAssert(
messaging,
'maybeGetPNICredential: window.textsecure.messaging not available'
);
const ourACI = storage.user.getCheckedUuid(UUIDKind.ACI);
const ourPNI = storage.user.getCheckedUuid(UUIDKind.PNI);
const {
requestHex: profileKeyCredentialRequestHex,
context: profileCredentialRequestContext,
} = generatePNICredentialRequest(
clientZkProfileCipher,
ourACI.toString(),
ourPNI.toString(),
profileKey
);
const profile = await messaging.getProfile(ourACI, {
userLanguages,
profileKeyVersion,
profileKeyCredentialRequest: profileKeyCredentialRequestHex,
credentialType: 'pni',
});
strictAssert(
profile.pniCredential,
'We must get the credential for ourselves'
);
const pniCredential = handleProfileKeyPNICredential(
clientZkProfileCipher,
profileCredentialRequestContext,
profile.pniCredential
);
c.set({ pniCredential });
log.info('maybeGetPNICredential: updated PNI credential');
}

View File

@ -1242,7 +1242,7 @@ async function processRemoteRecords(
);
// Intentionally not awaiting
pMap(needProfileFetch, convo => convo.getProfiles(), { concurrency: 3 });
needProfileFetch.map(convo => convo.getProfiles());
// Collect full map of previously and currently unknown records
const unknownRecords: Map<string, UnknownRecord> = new Map();

View File

@ -3,7 +3,6 @@
/* eslint-disable camelcase */
import PQueue from 'p-queue';
import type { ThunkAction } from 'redux-thunk';
import {
difference,
@ -1586,9 +1585,6 @@ function conversationStoppedByMissingVerification(payload: {
untrustedUuids: ReadonlyArray<string>;
}): ConversationStoppedByMissingVerificationActionType {
// Fetching profiles to ensure that we have their latest identity key in storage
const profileFetchQueue = new PQueue({
concurrency: 3,
});
payload.untrustedUuids.forEach(uuid => {
const conversation = window.ConversationController.get(uuid);
if (!conversation) {
@ -1598,10 +1594,8 @@ function conversationStoppedByMissingVerification(payload: {
return;
}
profileFetchQueue.add(() => {
const active = conversation.getActiveProfileFetch();
return active || conversation.getProfiles();
});
// Intentionally not awaiting here
conversation.getProfiles();
});
return {

View File

@ -0,0 +1,140 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { sleep } from '../../util';
import { MINUTE } from '../../util/durations';
import { ProfileService } from '../../services/profiles';
import { UUID } from '../../types/UUID';
import { HTTPError } from '../../textsecure/Errors';
describe('util/profiles', () => {
const UUID_1 = UUID.generate().toString();
const UUID_2 = UUID.generate().toString();
const UUID_3 = UUID.generate().toString();
const UUID_4 = UUID.generate().toString();
const UUID_5 = UUID.generate().toString();
beforeEach(async () => {
await window.ConversationController.getOrCreateAndWait(UUID_1, 'private');
await window.ConversationController.getOrCreateAndWait(UUID_2, 'private');
await window.ConversationController.getOrCreateAndWait(UUID_3, 'private');
await window.ConversationController.getOrCreateAndWait(UUID_4, 'private');
await window.ConversationController.getOrCreateAndWait(UUID_5, 'private');
});
describe('clearAll', () => {
it('Cancels all in-flight requests', async () => {
const getProfileWithLongDelay = async () => {
await sleep(MINUTE);
};
const service = new ProfileService(getProfileWithLongDelay);
const promise1 = service.get(UUID_1);
const promise2 = service.get(UUID_2);
const promise3 = service.get(UUID_3);
const promise4 = service.get(UUID_4);
service.clearAll('testing');
await assert.isRejected(promise1, 'job cancelled');
await assert.isRejected(promise2, 'job cancelled');
await assert.isRejected(promise3, 'job cancelled');
await assert.isRejected(promise4, 'job cancelled');
});
});
describe('pause', () => {
it('pauses the queue', async () => {
let runCount = 0;
const getProfileWithIncrement = () => {
runCount += 1;
return Promise.resolve();
};
const service = new ProfileService(getProfileWithIncrement);
// Queued and immediately started due to concurrency = 3
service.get(UUID_1);
service.get(UUID_2);
service.get(UUID_3);
// Queued but only run after paused queue restarts
const lastPromise = service.get(UUID_4);
const pausePromise = service.pause(5);
assert.strictEqual(runCount, 3, 'as pause starts');
await pausePromise;
await lastPromise;
assert.strictEqual(runCount, 4, 'after last promise');
});
});
describe('get', () => {
it('throws if we are currently paused', async () => {
let runCount = 0;
const getProfileWithIncrement = () => {
runCount += 1;
return Promise.resolve();
};
const service = new ProfileService(getProfileWithIncrement);
const pausePromise = service.pause(5);
// None of these are even queued
const promise1 = service.get(UUID_1);
const promise2 = service.get(UUID_2);
const promise3 = service.get(UUID_3);
const promise4 = service.get(UUID_4);
await assert.isRejected(promise1, 'paused queue');
await assert.isRejected(promise2, 'paused queue');
await assert.isRejected(promise3, 'paused queue');
await assert.isRejected(promise4, 'paused queue');
await pausePromise;
assert.strictEqual(runCount, 0);
});
it('clears all outstanding jobs if we get a 413, then pauses', async () => {
let runCount = 0;
const getProfileWhichThrows = async () => {
runCount += 1;
const error = new HTTPError('fake 413', {
code: 413,
headers: {
'retry-after': '1',
},
});
throw error;
};
const service = new ProfileService(getProfileWhichThrows);
// Queued and immediately started due to concurrency = 3
const promise1 = service.get(UUID_1);
const promise2 = service.get(UUID_2);
const promise3 = service.get(UUID_3);
// Never started, but queued
const promise4 = service.get(UUID_4);
assert.strictEqual(runCount, 3, 'before await');
await assert.isRejected(promise1, 'fake 413');
// Never queued
const promise5 = service.get(UUID_5);
await assert.isRejected(promise2, 'job cancelled');
await assert.isRejected(promise3, 'job cancelled');
await assert.isRejected(promise4, 'job cancelled');
await assert.isRejected(promise5, 'paused queue');
assert.strictEqual(runCount, 3, 'after await');
});
});
});

View File

@ -1,489 +1,8 @@
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type {
ProfileKeyCredentialRequestContext,
ClientZkProfileOperations,
} from '@signalapp/libsignal-client/zkgroup';
import { SEALED_SENDER } from '../types/SealedSender';
import * as Errors from '../types/errors';
import type {
GetProfileOptionsType,
GetProfileUnauthOptionsType,
} from '../textsecure/WebAPI';
import { HTTPError } from '../textsecure/Errors';
import { Address } from '../types/Address';
import { QualifiedAddress } from '../types/QualifiedAddress';
import { UUIDKind } from '../types/UUID';
import * as Bytes from '../Bytes';
import { trimForDisplay, verifyAccessKey, decryptProfile } from '../Crypto';
import {
generateProfileKeyCredentialRequest,
generatePNICredentialRequest,
getClientZkProfileOperations,
handleProfileKeyCredential,
handleProfileKeyPNICredential,
} from './zkgroup';
import { isMe } from './whatTypeOfConversation';
import type { ConversationModel } from '../models/conversations';
import * as log from '../logging/log';
import { getUserLanguages } from './userLanguages';
import { parseBadgesFromServer } from '../badges/parseBadgesFromServer';
import { strictAssert } from './assert';
async function maybeGetPNICredential(
c: ConversationModel,
{
clientZkProfileCipher,
profileKey,
profileKeyVersion,
userLanguages,
}: {
clientZkProfileCipher: ClientZkProfileOperations;
profileKey: string;
profileKeyVersion: string;
userLanguages: ReadonlyArray<string>;
}
): Promise<void> {
// Already present and up-to-date
if (c.get('pniCredential')) {
return;
}
strictAssert(isMe(c.attributes), 'Has to fetch PNI credential for ourselves');
log.info('maybeGetPNICredential: requesting PNI credential');
const { storage, messaging } = window.textsecure;
strictAssert(
messaging,
'maybeGetPNICredential: window.textsecure.messaging not available'
);
const ourACI = storage.user.getCheckedUuid(UUIDKind.ACI);
const ourPNI = storage.user.getCheckedUuid(UUIDKind.PNI);
const {
requestHex: profileKeyCredentialRequestHex,
context: profileCredentialRequestContext,
} = generatePNICredentialRequest(
clientZkProfileCipher,
ourACI.toString(),
ourPNI.toString(),
profileKey
);
const profile = await messaging.getProfile(ourACI, {
userLanguages,
profileKeyVersion,
profileKeyCredentialRequest: profileKeyCredentialRequestHex,
credentialType: 'pni',
});
strictAssert(
profile.pniCredential,
'We must get the credential for ourselves'
);
const pniCredential = handleProfileKeyPNICredential(
clientZkProfileCipher,
profileCredentialRequestContext,
profile.pniCredential
);
c.set({ pniCredential });
log.info('maybeGetPNICredential: updated PNI credential');
}
async function doGetProfile(c: ConversationModel): Promise<void> {
const idForLogging = c.idForLogging();
const { messaging } = window.textsecure;
strictAssert(
messaging,
'getProfile: window.textsecure.messaging not available'
);
const { updatesUrl } = window.SignalContext.config;
strictAssert(
typeof updatesUrl === 'string',
'getProfile: expected updatesUrl to be a defined string'
);
const clientZkProfileCipher = getClientZkProfileOperations(
window.getServerPublicParams()
);
const userLanguages = getUserLanguages(
navigator.languages,
window.getLocale()
);
let profile;
c.deriveAccessKeyIfNeeded();
const profileKey = c.get('profileKey');
const profileKeyVersion = c.deriveProfileKeyVersion();
const uuid = c.getCheckedUuid('getProfile');
const existingProfileKeyCredential = c.get('profileKeyCredential');
const lastProfile = c.get('lastProfile');
let profileCredentialRequestContext:
| undefined
| ProfileKeyCredentialRequestContext;
let getProfileOptions: GetProfileOptionsType | GetProfileUnauthOptionsType;
let accessKey = c.get('accessKey');
if (profileKey) {
strictAssert(
profileKeyVersion && accessKey,
'profileKeyVersion and accessKey are derived from profileKey'
);
if (existingProfileKeyCredential) {
getProfileOptions = {
accessKey,
profileKeyVersion,
userLanguages,
};
} else {
log.info(
'getProfile: generating profile key credential request for ' +
`conversation ${idForLogging}`
);
let profileKeyCredentialRequestHex: undefined | string;
({
requestHex: profileKeyCredentialRequestHex,
context: profileCredentialRequestContext,
} = generateProfileKeyCredentialRequest(
clientZkProfileCipher,
uuid.toString(),
profileKey
));
getProfileOptions = {
accessKey,
userLanguages,
profileKeyVersion,
profileKeyCredentialRequest: profileKeyCredentialRequestHex,
};
}
} else {
strictAssert(
!accessKey,
'accessKey have to be absent because there is no profileKey'
);
if (lastProfile?.profileKeyVersion) {
getProfileOptions = {
userLanguages,
profileKeyVersion: lastProfile.profileKeyVersion,
};
} else {
getProfileOptions = { userLanguages };
}
}
const isVersioned = Boolean(getProfileOptions.profileKeyVersion);
log.info(
`getProfile: getting ${isVersioned ? 'versioned' : 'unversioned'} ` +
`profile for conversation ${idForLogging}`
);
try {
if (getProfileOptions.accessKey) {
try {
profile = await messaging.getProfile(uuid, getProfileOptions);
} catch (error) {
if (!(error instanceof HTTPError)) {
throw error;
}
if (error.code === 401 || error.code === 403) {
if (isMe(c.attributes)) {
throw error;
}
await c.setProfileKey(undefined);
// Retry fetch using last known profileKeyVersion or fetch
// unversioned profile.
return doGetProfile(c);
}
if (error.code === 404) {
await c.removeLastProfile(lastProfile);
}
throw error;
}
} else {
try {
// We won't get the credential, but lets either fetch:
// - a versioned profile using last known profileKeyVersion
// - some basic profile information (capabilities, badges, etc).
profile = await messaging.getProfile(uuid, getProfileOptions);
} catch (error) {
if (error instanceof HTTPError && error.code === 404) {
log.info(`getProfile: failed to find a profile for ${idForLogging}`);
await c.removeLastProfile(lastProfile);
if (!isVersioned) {
log.info(`getProfile: marking ${idForLogging} as unregistered`);
c.setUnregistered();
}
}
throw error;
}
}
if (isMe(c.attributes) && profileKey && profileKeyVersion) {
try {
await maybeGetPNICredential(c, {
clientZkProfileCipher,
profileKey,
profileKeyVersion,
userLanguages,
});
} catch (error) {
log.warn(
'getProfile failed to get our own PNI credential',
Errors.toLogFormat(error)
);
}
}
if (profile.identityKey) {
const identityKey = Bytes.fromBase64(profile.identityKey);
const changed = await window.textsecure.storage.protocol.saveIdentity(
new Address(uuid, 1),
identityKey,
false
);
if (changed) {
// save identity will close all sessions except for .1, so we
// must close that one manually.
const ourUuid = window.textsecure.storage.user.getCheckedUuid();
await window.textsecure.storage.protocol.archiveSession(
new QualifiedAddress(ourUuid, new Address(uuid, 1))
);
}
}
// Update accessKey to prevent race conditions. Since we run asynchronous
// requests above - it is possible that someone updates or erases
// the profile key from under us.
accessKey = c.get('accessKey');
if (profile.unrestrictedUnidentifiedAccess && profile.unidentifiedAccess) {
log.info(
`getProfile: setting sealedSender to UNRESTRICTED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.UNRESTRICTED,
});
} else if (accessKey && profile.unidentifiedAccess) {
const haveCorrectKey = verifyAccessKey(
Bytes.fromBase64(accessKey),
Bytes.fromBase64(profile.unidentifiedAccess)
);
if (haveCorrectKey) {
log.info(
`getProfile: setting sealedSender to ENABLED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.ENABLED,
});
} else {
log.warn(
`getProfile: setting sealedSender to DISABLED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.DISABLED,
});
}
} else {
log.info(
`getProfile: setting sealedSender to DISABLED for conversation ${idForLogging}`
);
c.set({
sealedSender: SEALED_SENDER.DISABLED,
});
}
const rawDecryptionKey = c.get('profileKey') || lastProfile?.profileKey;
const decryptionKey = rawDecryptionKey
? Bytes.fromBase64(rawDecryptionKey)
: undefined;
if (profile.about) {
if (decryptionKey) {
const decrypted = decryptProfile(
Bytes.fromBase64(profile.about),
decryptionKey
);
c.set('about', Bytes.toString(trimForDisplay(decrypted)));
}
} else {
c.unset('about');
}
if (profile.aboutEmoji) {
if (decryptionKey) {
const decrypted = decryptProfile(
Bytes.fromBase64(profile.aboutEmoji),
decryptionKey
);
c.set('aboutEmoji', Bytes.toString(trimForDisplay(decrypted)));
}
} else {
c.unset('aboutEmoji');
}
if (profile.paymentAddress && isMe(c.attributes)) {
window.storage.put('paymentAddress', profile.paymentAddress);
}
if (profile.capabilities) {
c.set({ capabilities: profile.capabilities });
} else {
c.unset('capabilities');
}
const badges = parseBadgesFromServer(profile.badges, updatesUrl);
if (badges.length) {
await window.reduxActions.badges.updateOrCreate(badges);
c.set({
badges: badges.map(badge => ({
id: badge.id,
...('expiresAt' in badge
? {
expiresAt: badge.expiresAt,
isVisible: badge.isVisible,
}
: {}),
})),
});
} else {
c.unset('badges');
}
if (profileCredentialRequestContext) {
if (profile.credential) {
const {
credential: profileKeyCredential,
expiration: profileKeyCredentialExpiration,
} = handleProfileKeyCredential(
clientZkProfileCipher,
profileCredentialRequestContext,
profile.credential
);
c.set({ profileKeyCredential, profileKeyCredentialExpiration });
} else {
c.unset('profileKeyCredential');
}
}
} catch (error) {
if (!(error instanceof HTTPError)) {
throw error;
}
switch (error.code) {
case 401:
case 403:
if (
c.get('sealedSender') === SEALED_SENDER.ENABLED ||
c.get('sealedSender') === SEALED_SENDER.UNRESTRICTED
) {
log.warn(
`getProfile: Got 401/403 when using accessKey for ${idForLogging}, removing profileKey`
);
if (!isMe(c.attributes)) {
await c.setProfileKey(undefined);
}
}
if (c.get('sealedSender') === SEALED_SENDER.UNKNOWN) {
log.warn(
`getProfile: Got 401/403 when using accessKey for ${idForLogging}, setting sealedSender = DISABLED`
);
c.set('sealedSender', SEALED_SENDER.DISABLED);
}
return;
default:
log.warn(
'getProfile failure:',
idForLogging,
Errors.toLogFormat(error)
);
return;
}
}
const decryptionKeyString = profileKey || lastProfile?.profileKey;
const decryptionKey = decryptionKeyString
? Bytes.fromBase64(decryptionKeyString)
: undefined;
let isSuccessfullyDecrypted = true;
if (profile.name) {
if (decryptionKey) {
try {
await c.setEncryptedProfileName(profile.name, decryptionKey);
} catch (error) {
log.warn(
'getProfile decryption failure:',
idForLogging,
Errors.toLogFormat(error)
);
isSuccessfullyDecrypted = false;
await c.set({
profileName: undefined,
profileFamilyName: undefined,
});
}
}
} else {
c.set({
profileName: undefined,
profileFamilyName: undefined,
});
}
try {
if (decryptionKey) {
await c.setProfileAvatar(profile.avatar, decryptionKey);
}
} catch (error) {
if (error instanceof HTTPError) {
if (error.code === 403 || error.code === 404) {
log.warn(
`getProfile: profile avatar is missing for conversation ${idForLogging}`
);
}
} else {
log.warn(
`getProfile: failed to decrypt avatar for conversation ${idForLogging}`,
Errors.toLogFormat(error)
);
isSuccessfullyDecrypted = false;
}
}
c.set('profileLastFetchedAt', Date.now());
// After we successfully decrypted - update lastProfile property
if (
isSuccessfullyDecrypted &&
profileKey &&
getProfileOptions.profileKeyVersion
) {
await c.updateLastProfile(lastProfile, {
profileKey,
profileKeyVersion: getProfileOptions.profileKeyVersion,
});
}
window.Signal.Data.updateConversation(c.attributes);
}
import { profileService } from '../services/profiles';
export async function getProfile(
providedUuid?: string,
@ -499,5 +18,5 @@ export async function getProfile(
return;
}
await doGetProfile(c);
return profileService.get(c.id);
}

View File

@ -35,7 +35,7 @@ export function isConversationAccepted(
const { sentMessageCount } = conversationAttrs;
const hasSentMessages = sentMessageCount > 0;
const hasSentMessages = (sentMessageCount || 0) > 0;
const hasMessagesBeforeMessageRequests =
(conversationAttrs.messageCountBeforeMessageRequests || 0) > 0;
const hasNoMessages = (conversationAttrs.messageCount || 0) === 0;
@ -47,7 +47,7 @@ export function isConversationAccepted(
const isEmptyWhitelistedGroup =
hasNoMessages &&
!isDirectConversation(conversationAttrs) &&
conversationAttrs.profileSharing;
Boolean(conversationAttrs.profileSharing);
return (
isFromOrAddedByTrustedContact(conversationAttrs) ||