diff --git a/ts/model-types.d.ts b/ts/model-types.d.ts index 1d6b1e843..68763ac85 100644 --- a/ts/model-types.d.ts +++ b/ts/model-types.d.ts @@ -273,6 +273,7 @@ export type ConversationAttributesType = { needsVerification?: boolean; profileSharing: boolean; storageID?: string; + storageVersion?: number; storageUnknownFields?: string; unreadCount?: number; version: number; diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 24d7047bf..2d49a76d1 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -144,6 +144,7 @@ const ATTRIBUTES_THAT_DONT_INVALIDATE_PROPS_CACHE = new Set([ 'profileLastFetchedAt', 'needsStorageServiceSync', 'storageID', + 'storageVersion', 'storageUnknownFields', ]); @@ -5047,7 +5048,7 @@ export class ConversationModel extends window.Backbone }); } - startMuteTimer(): void { + startMuteTimer({ viaStorageServiceSync = false } = {}): void { if (this.muteTimer !== undefined) { clearTimeout(this.muteTimer); this.muteTimer = undefined; @@ -5057,7 +5058,7 @@ export class ConversationModel extends window.Backbone if (isNumber(muteExpiresAt) && muteExpiresAt < Number.MAX_SAFE_INTEGER) { const delay = muteExpiresAt - Date.now(); if (delay <= 0) { - this.setMuteExpiration(0); + this.setMuteExpiration(0, { viaStorageServiceSync }); return; } @@ -5076,7 +5077,10 @@ export class ConversationModel extends window.Backbone } this.set({ muteExpiresAt }); - this.startMuteTimer(); + + // Don't cause duplicate captureChange + this.startMuteTimer({ viaStorageServiceSync: true }); + if (!viaStorageServiceSync) { this.captureChange('mutedUntilTimestamp'); } diff --git a/ts/services/storage.ts b/ts/services/storage.ts index 470b14655..2c15cf490 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -23,6 +23,7 @@ import { toGroupV1Record, toGroupV2Record, } from './storageRecordOps'; +import type { MergeResultType } from './storageRecordOps'; import type { ConversationModel } from '../models/conversations'; import { strictAssert } from '../util/assert'; import * as durations from '../util/durations'; @@ -70,17 +71,30 @@ const conflictBackOff = new BackOff([ 30 * durations.SECOND, ]); -function redactStorageID(storageID: string): string { - return storageID.substring(0, 3); +function redactStorageID( + storageID: string, + version?: number, + conversation?: ConversationModel +): string { + const convoId = conversation ? ` ${conversation?.idForLogging()}` : ''; + return `${version ?? '?'}:${storageID.substring(0, 3)}${convoId}`; } type RemoteRecord = { itemType: number; storageID: string; + storageVersion?: number; }; type UnknownRecord = RemoteRecord; +function unknownRecordToRedactedID({ + storageID, + storageVersion, +}: UnknownRecord): string { + return redactStorageID(storageID, storageVersion); +} + async function encryptRecord( storageID: string | undefined, storageRecord: Proto.IStorageRecord @@ -132,9 +146,8 @@ async function generateManifest( isNewManifest = false ): Promise { log.info( - 'storageService.generateManifest: generating manifest', - version, - isNewManifest + `storageService.upload(${version}): generating manifest ` + + `new=${isNewManifest}` ); await window.ConversationController.checkForConflicts(); @@ -195,82 +208,84 @@ async function generateManifest( storageRecord.groupV1 = await toGroupV1Record(conversation); identifier.type = ITEM_TYPE.GROUPV1; } else { - log.info( - 'storageService.generateManifest: unknown conversation', - conversation.idForLogging() + log.warn( + `storageService.upload(${version}): ` + + `unknown conversation=${conversation.idForLogging()}` ); } - if (storageRecord) { - const currentStorageID = conversation.get('storageID'); - - const isNewItem = - isNewManifest || - Boolean(conversation.get('needsStorageServiceSync')) || - !currentStorageID; - - const storageID = isNewItem - ? Bytes.toBase64(generateStorageID()) - : currentStorageID; - - let storageItem; - try { - // eslint-disable-next-line no-await-in-loop - storageItem = await encryptRecord(storageID, storageRecord); - } catch (err) { - log.error( - 'storageService.generateManifest: encrypt record failed:', - err && err.stack ? err.stack : String(err) - ); - throw err; - } - identifier.raw = storageItem.key; - - // When a client needs to update a given record it should create it - // under a new key and delete the existing key. - if (isNewItem) { - newItems.add(storageItem); - - if (storageID) { - insertKeys.push(storageID); - log.info( - 'storageService.generateManifest: new key', - conversation.idForLogging(), - redactStorageID(storageID) - ); - } else { - log.info( - 'storageService.generateManifest: no storage id', - conversation.idForLogging() - ); - } - - const oldStorageID = conversation.get('storageID'); - if (oldStorageID) { - log.info( - 'storageService.generateManifest: deleting key', - redactStorageID(oldStorageID) - ); - deleteKeys.push(Bytes.fromBase64(oldStorageID)); - } - - conversationsToUpdate.push({ - conversation, - storageID, - }); - } - - manifestRecordKeys.add(identifier); + if (!storageRecord) { + continue; } + + const currentStorageID = conversation.get('storageID'); + const currentStorageVersion = conversation.get('storageVersion'); + + const currentRedactedID = currentStorageID + ? redactStorageID(currentStorageID, currentStorageVersion) + : undefined; + + const isNewItem = + isNewManifest || + Boolean(conversation.get('needsStorageServiceSync')) || + !currentStorageID; + + const storageID = isNewItem + ? Bytes.toBase64(generateStorageID()) + : currentStorageID; + + let storageItem; + try { + // eslint-disable-next-line no-await-in-loop + storageItem = await encryptRecord(storageID, storageRecord); + } catch (err) { + log.error( + `storageService.upload(${version}): encrypt record failed:`, + Errors.toLogFormat(err) + ); + throw err; + } + identifier.raw = storageItem.key; + + // When a client needs to update a given record it should create it + // under a new key and delete the existing key. + if (isNewItem) { + newItems.add(storageItem); + + insertKeys.push(storageID); + const newRedactedID = redactStorageID(storageID, version, conversation); + if (currentStorageID) { + log.info( + `storageService.upload(${version}): ` + + `updating from=${currentRedactedID} ` + + `to=${newRedactedID}` + ); + deleteKeys.push(Bytes.fromBase64(currentStorageID)); + } else { + log.info( + `storageService.upload(${version}): adding key=${newRedactedID}` + ); + } + + conversationsToUpdate.push({ + conversation, + storageID, + }); + } + + manifestRecordKeys.add(identifier); } const unknownRecordsArray: ReadonlyArray = ( window.storage.get('storage-service-unknown-records') || [] ).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType)); + const redactedUnknowns = unknownRecordsArray.map(unknownRecordToRedactedID); + log.info( - 'storageService.generateManifest: adding unknown records:', - unknownRecordsArray.length + `storageService.upload(${version}): adding unknown ` + + `records=${JSON.stringify(redactedUnknowns)} ` + + `count=${redactedUnknowns.length}` ); // When updating the manifest, ensure all "unknown" keys are added to the @@ -287,10 +302,11 @@ async function generateManifest( 'storage-service-error-records', new Array() ); + const redactedErrors = recordsWithErrors.map(unknownRecordToRedactedID); log.info( - 'storageService.generateManifest: adding records that had errors in the previous merge', - recordsWithErrors.length + `storageService.upload(${version}): adding error ` + + `records=${JSON.stringify(redactedErrors)} count=${redactedErrors.length}` ); // These records failed to merge in the previous fetchManifest, but we still @@ -320,8 +336,10 @@ async function generateManifest( rawDuplicates.has(identifier.raw) || typeRawDuplicates.has(typeAndRaw) ) { - log.info( - 'storageService.generateManifest: removing duplicate identifier from manifest', + log.warn( + `storageService.upload(${version}): removing from duplicate item ` + + 'from the manifest', + redactStorageID(storageID), identifier.type ); manifestRecordKeys.delete(identifier); @@ -334,8 +352,9 @@ async function generateManifest( key => Bytes.toBase64(key) === storageID ); if (hasDeleteKey) { - log.info( - 'storageService.generateManifest: removing key which has been deleted', + log.warn( + `storageService.upload(${version}): removing key which has been deleted`, + redactStorageID(storageID), identifier.type ); manifestRecordKeys.delete(identifier); @@ -344,7 +363,10 @@ async function generateManifest( // Ensure that there is *exactly* one Account type in the manifest if (identifier.type === ITEM_TYPE.ACCOUNT) { if (hasAccountType) { - log.info('storageService.generateManifest: removing duplicate account'); + log.warn( + `storageService.upload(${version}): removing duplicate account`, + redactStorageID(storageID) + ); manifestRecordKeys.delete(identifier); } hasAccountType = true; @@ -362,8 +384,9 @@ async function generateManifest( const storageID = Bytes.toBase64(storageItem.key); if (storageKeyDuplicates.has(storageID)) { - log.info( - 'storageService.generateManifest: removing duplicate identifier from inserts', + log.warn( + `storageService.upload(${version}): ` + + 'removing duplicate identifier from inserts', redactStorageID(storageID) ); newItems.delete(storageItem); @@ -413,7 +436,7 @@ async function generateManifest( const remoteDeletes: Array = []; pendingDeletes.forEach(id => remoteDeletes.push(redactStorageID(id))); log.error( - 'Delete key sizes do not match', + `storageService.upload(${version}): delete key sizes do not match`, 'local', localDeletes.join(','), 'remote', @@ -482,16 +505,15 @@ async function uploadManifest( } if (newItems.size === 0 && deleteKeys.length === 0) { - log.info('storageService.uploadManifest: nothing to upload'); + log.info(`storageService.upload(${version}): nothing to upload`); return; } const credentials = window.storage.get('storageCredentials'); try { log.info( - 'storageService.uploadManifest: keys inserting, deleting:', - newItems.size, - deleteKeys.length + `storageService.upload(${version}): inserting=${newItems.size} ` + + `deleting=${deleteKeys.length}` ); const writeOperation = new Proto.WriteOperation(); @@ -499,7 +521,6 @@ async function uploadManifest( writeOperation.insertItem = Array.from(newItems); writeOperation.deleteKey = deleteKeys; - log.info('storageService.uploadManifest: uploading...', version); await window.textsecure.messaging.modifyStorageRecords( Proto.WriteOperation.encode(writeOperation).finish(), { @@ -508,34 +529,38 @@ async function uploadManifest( ); log.info( - 'storageService.uploadManifest: upload done, updating conversation(s) with new storageIDs:', - conversationsToUpdate.length + `storageService.upload(${version}): upload complete, updating ` + + `conversations=${conversationsToUpdate.length}` ); // update conversations with the new storageID conversationsToUpdate.forEach(({ conversation, storageID }) => { conversation.set({ needsStorageServiceSync: false, + storageVersion: version, storageID, }); updateConversation(conversation.attributes); }); } catch (err) { log.error( - 'storageService.uploadManifest: failed!', - err && err.stack ? err.stack : String(err) + `storageService.upload(${version}): failed!`, + Errors.toLogFormat(err) ); if (err.code === 409) { if (conflictBackOff.isFull()) { log.error( - 'storageService.uploadManifest: Exceeded maximum consecutive conflicts' + `storageService.upload(${version}): exceeded maximum consecutive ` + + 'conflicts' ); return; } log.info( - `storageService.uploadManifest: Conflict found with v${version}, running sync job times(${conflictBackOff.getIndex()})` + `storageService.upload(${version}): conflict found with ` + + `version=${version}, running sync job ` + + `times=${conflictBackOff.getIndex()}` ); throw err; @@ -544,40 +569,30 @@ async function uploadManifest( throw err; } - log.info( - 'storageService.uploadManifest: setting new manifestVersion', - version - ); + log.info(`storageService.upload(${version}): setting new manifestVersion`); window.storage.put('manifestVersion', version); conflictBackOff.reset(); backOff.reset(); - if (window.ConversationController.areWePrimaryDevice()) { - log.warn( - 'storageService.uploadManifest: We are primary device; not sending sync manifest' - ); - return; - } - try { await singleProtoJobQueue.add( window.textsecure.messaging.getFetchManifestSyncMessage() ); } catch (error) { log.error( - 'storageService.uploadManifest: Failed to queue sync message', + `storageService.upload(${version}): Failed to queue sync message`, Errors.toLogFormat(error) ); } } -async function stopStorageServiceSync() { - log.info('storageService.stopStorageServiceSync'); +async function stopStorageServiceSync(reason: Error) { + log.warn('storageService.stopStorageServiceSync', Errors.toLogFormat(reason)); await window.storage.remove('storageKey'); if (backOff.isFull()) { - log.info( + log.warn( 'storageService.stopStorageServiceSync: too many consecutive stops' ); return; @@ -650,10 +665,10 @@ async function decryptManifest( async function fetchManifest( manifestVersion: number ): Promise { - log.info('storageService.fetchManifest'); + log.info('storageService.sync: fetch start'); if (!window.textsecure.messaging) { - throw new Error('storageService.fetchManifest: We are offline!'); + throw new Error('storageService.sync: we are offline!'); } try { @@ -669,28 +684,19 @@ async function fetchManifest( ); const encryptedManifest = Proto.StorageManifest.decode(manifestBinary); - // if we don't get a value we're assuming that there's no newer manifest - if (!encryptedManifest.value || !encryptedManifest.version) { - log.info('storageService.fetchManifest: nothing changed'); - return; - } - try { return decryptManifest(encryptedManifest); } catch (err) { - await stopStorageServiceSync(); + await stopStorageServiceSync(err); return; } } catch (err) { if (err.code === 204) { - log.info('storageService.fetchManifest: no newer manifest, ok'); + log.info('storageService.sync: no newer manifest, ok'); return; } - log.error( - 'storageService.fetchManifest: failed!', - err && err.stack ? err.stack : String(err) - ); + log.error('storageService.sync: failed!', Errors.toLogFormat(err)); if (err.code === 404) { await createNewManifest(); @@ -714,49 +720,80 @@ type MergedRecordType = UnknownRecord & { }; async function mergeRecord( + storageVersion: number, itemToMerge: MergeableItemType ): Promise { const { itemType, storageID, storageRecord } = itemToMerge; const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; - let hasConflict = false; + let mergeResult: MergeResultType = { hasConflict: false, details: [] }; let isUnsupported = false; let hasError = false; try { if (itemType === ITEM_TYPE.UNKNOWN) { - log.info('storageService.mergeRecord: Unknown item type', storageID); + log.warn('storageService.mergeRecord: Unknown item type', storageID); } else if (itemType === ITEM_TYPE.CONTACT && storageRecord.contact) { - hasConflict = await mergeContactRecord(storageID, storageRecord.contact); + mergeResult = await mergeContactRecord( + storageID, + storageVersion, + storageRecord.contact + ); } else if (itemType === ITEM_TYPE.GROUPV1 && storageRecord.groupV1) { - hasConflict = await mergeGroupV1Record(storageID, storageRecord.groupV1); + mergeResult = await mergeGroupV1Record( + storageID, + storageVersion, + storageRecord.groupV1 + ); } else if (itemType === ITEM_TYPE.GROUPV2 && storageRecord.groupV2) { - hasConflict = await mergeGroupV2Record(storageID, storageRecord.groupV2); + mergeResult = await mergeGroupV2Record( + storageID, + storageVersion, + storageRecord.groupV2 + ); } else if (itemType === ITEM_TYPE.ACCOUNT && storageRecord.account) { - hasConflict = await mergeAccountRecord(storageID, storageRecord.account); + mergeResult = await mergeAccountRecord( + storageID, + storageVersion, + storageRecord.account + ); } else { isUnsupported = true; - log.info('storageService.mergeRecord: Unknown record:', itemType); + log.warn( + `storageService.merge(${redactStorageID( + storageID, + storageVersion + )}): unknown item type=${itemType}` + ); } + + const redactedID = redactStorageID( + storageID, + storageVersion, + mergeResult.conversation + ); + const oldID = mergeResult.oldStorageID + ? redactStorageID(mergeResult.oldStorageID, mergeResult.oldStorageVersion) + : '?'; log.info( - 'storageService.mergeRecord: merged', - redactStorageID(storageID), - itemType, - hasConflict + `storageService.merge(${redactedID}): merged item type=${itemType} ` + + `oldID=${oldID} ` + + `conflict=${mergeResult.hasConflict} ` + + `details=${JSON.stringify(mergeResult.details)}` ); } catch (err) { hasError = true; + const redactedID = redactStorageID(storageID, storageVersion); log.error( - 'storageService.mergeRecord: Error with', - redactStorageID(storageID), - itemType, - String(err) + `storageService.merge(${redactedID}): error with ` + + `item type=${itemType} ` + + `details=${Errors.toLogFormat(err)}` ); } return { - hasConflict, + hasConflict: mergeResult.hasConflict, hasError, isUnsupported, itemType, @@ -765,8 +802,9 @@ async function mergeRecord( } async function processManifest( - manifest: Proto.IManifestRecord -): Promise { + manifest: Proto.IManifestRecord, + version: number +): Promise { if (!window.textsecure.messaging) { throw new Error('storageService.processManifest: We are offline!'); } @@ -778,13 +816,13 @@ async function processManifest( }); const remoteKeys = new Set(remoteKeysTypeMap.keys()); - const localKeys: Set = new Set(); + const localVersions = new Map(); const conversations = window.getConversations(); conversations.forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); if (storageID) { - localKeys.add(storageID); + localVersions.set(storageID, conversation.get('storageVersion')); } }); @@ -794,33 +832,47 @@ async function processManifest( const stillUnknown = unknownRecordsArray.filter((record: UnknownRecord) => { // Do not include any unknown records that we already support if (!validRecordTypes.has(record.itemType)) { - localKeys.add(record.storageID); + localVersions.set(record.storageID, record.storageVersion); return false; } return true; }); - log.info( - 'storageService.processManifest: local records:', - conversations.length - ); - log.info('storageService.processManifest: local keys:', localKeys.size); - log.info( - 'storageService.processManifest: unknown records:', - stillUnknown.length - ); - log.info('storageService.processManifest: remote keys:', remoteKeys.size); - - const remoteOnlySet: Set = new Set(); - remoteKeys.forEach((key: string) => { - if (!localKeys.has(key)) { + const remoteOnlySet = new Set(); + for (const key of remoteKeys) { + if (!localVersions.has(key)) { remoteOnlySet.add(key); } - }); + } + + const localOnlySet = new Set(); + for (const key of localVersions.keys()) { + if (!remoteKeys.has(key)) { + localOnlySet.add(key); + } + } + + const redactedRemoteOnly = Array.from(remoteOnlySet).map(id => + redactStorageID(id, version) + ); + const redactedLocalOnly = Array.from(localOnlySet).map(id => + redactStorageID(id, localVersions.get(id)) + ); log.info( - 'storageService.processManifest: remote ids:', - Array.from(remoteOnlySet).map(redactStorageID).join(',') + `storageService.process(${version}): localRecords=${conversations.length} ` + + `localKeys=${localVersions.size} unknownKeys=${stillUnknown.length} ` + + `remoteKeys=${remoteKeys.size}` + ); + log.info( + `storageService.process(${version}): ` + + `remoteOnlyCount=${remoteOnlySet.size} ` + + `remoteOnlyKeys=${JSON.stringify(redactedRemoteOnly)}` + ); + log.info( + `storageService.process(${version}): ` + + `localOnlyCount=${localOnlySet.size} ` + + `localOnlyKeys=${JSON.stringify(redactedLocalOnly)}` ); const remoteOnlyRecords = new Map(); @@ -831,12 +883,11 @@ async function processManifest( }); }); - if (!remoteOnlyRecords.size) { - return false; + let conflictCount = 0; + if (remoteOnlyRecords.size) { + conflictCount = await processRemoteRecords(version, remoteOnlyRecords); } - const conflictCount = await processRemoteRecords(remoteOnlyRecords); - // Post-merge, if our local records contain any storage IDs that were not // present in the remote manifest then we'll need to clear it, generate a // new storageID for that record, and upload. @@ -845,20 +896,31 @@ async function processManifest( window.getConversations().forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); if (storageID && !remoteKeys.has(storageID)) { + const storageVersion = conversation.get('storageVersion'); + const missingKey = redactStorageID( + storageID, + storageVersion, + conversation + ); log.info( - 'storageService.processManifest: local key was not in remote manifest', - redactStorageID(storageID), - conversation.idForLogging() + `storageService.process(${version}): localKey=${missingKey} was not ` + + 'in remote manifest' ); conversation.unset('storageID'); + conversation.unset('storageVersion'); updateConversation(conversation.attributes); } }); - return conflictCount !== 0; + log.info( + `storageService.process(${version}): conflictCount=${conflictCount}` + ); + + return conflictCount; } async function processRemoteRecords( + storageVersion: number, remoteOnlyRecords: Map ): Promise { const storageKeyBase64 = window.storage.get('storageKey'); @@ -868,8 +930,8 @@ async function processRemoteRecords( const storageKey = Bytes.fromBase64(storageKeyBase64); log.info( - 'storageService.processRemoteRecords: remote only keys', - remoteOnlyRecords.size + `storageService.process(${storageVersion}): fetching remote keys ` + + `count=${remoteOnlyRecords.size}` ); const readOperation = new Proto.ReadOperation(); @@ -888,11 +950,6 @@ async function processRemoteRecords( const storageItems = Proto.StorageItems.decode(storageItemsBuffer); - if (!storageItems.items) { - log.info('storageService.processRemoteRecords: No storage items retrieved'); - return 0; - } - const decryptedStorageItems = await pMap( storageItems.items, async ( @@ -901,13 +958,12 @@ async function processRemoteRecords( const { key, value: storageItemCiphertext } = storageRecordWrapper; if (!key || !storageItemCiphertext) { - log.error( - 'storageService.processRemoteRecords: No key or Ciphertext available' - ); - await stopStorageServiceSync(); - throw new Error( - 'storageService.processRemoteRecords: Missing key and/or Ciphertext' + const error = new Error( + `storageService.process(${storageVersion}): ` + + 'missing key and/or Ciphertext' ); + await stopStorageServiceSync(error); + throw error; } const base64ItemID = Bytes.toBase64(key); @@ -922,9 +978,11 @@ async function processRemoteRecords( ); } catch (err) { log.error( - 'storageService.processRemoteRecords: Error decrypting storage item' + `storageService.process(${storageVersion}): ` + + 'Error decrypting storage item', + Errors.toLogFormat(err) ); - await stopStorageServiceSync(); + await stopStorageServiceSync(err); throw err; } @@ -957,24 +1015,28 @@ async function processRemoteRecords( try { log.info( - `storageService.processRemoteRecords: Attempting to merge ${sortedStorageItems.length} records` + `storageService.process(${storageVersion}): ` + + `attempting to merge records=${sortedStorageItems.length}` + ); + const mergedRecords = await pMap( + sortedStorageItems, + (item: MergeableItemType) => mergeRecord(storageVersion, item), + { concurrency: 5 } ); - const mergedRecords = await pMap(sortedStorageItems, mergeRecord, { - concurrency: 5, - }); log.info( - `storageService.processRemoteRecords: Processed ${mergedRecords.length} records` + `storageService.process(${storageVersion}): ` + + `processed records=${mergedRecords.length}` ); // Collect full map of previously and currently unknown records const unknownRecords: Map = new Map(); - const unknownRecordsArray: ReadonlyArray = + const previousUnknownRecords: ReadonlyArray = window.storage.get( 'storage-service-unknown-records', new Array() ); - unknownRecordsArray.forEach((record: UnknownRecord) => { + previousUnknownRecords.forEach((record: UnknownRecord) => { unknownRecords.set(record.storageID, record); }); @@ -987,11 +1049,13 @@ async function processRemoteRecords( unknownRecords.set(mergedRecord.storageID, { itemType: mergedRecord.itemType, storageID: mergedRecord.storageID, + storageVersion, }); } else if (mergedRecord.hasError) { newRecordsWithErrors.push({ itemType: mergedRecord.itemType, storageID: mergedRecord.storageID, + storageVersion, }); } @@ -1004,36 +1068,46 @@ async function processRemoteRecords( const newUnknownRecords = Array.from(unknownRecords.values()).filter( (record: UnknownRecord) => !validRecordTypes.has(record.itemType) ); - - log.info( - 'storageService.processRemoteRecords: Unknown records found:', - newUnknownRecords.length + const redactedNewUnknowns = newUnknownRecords.map( + unknownRecordToRedactedID ); - window.storage.put('storage-service-unknown-records', newUnknownRecords); log.info( - 'storageService.processRemoteRecords: Records with errors:', - newRecordsWithErrors.length + `storageService.process(${storageVersion}): ` + + `unknown records=${JSON.stringify(redactedNewUnknowns)} ` + + `count=${redactedNewUnknowns.length}` + ); + await window.storage.put( + 'storage-service-unknown-records', + newUnknownRecords + ); + + const redactedErrorRecords = newRecordsWithErrors.map( + unknownRecordToRedactedID + ); + log.info( + `storageService.process(${storageVersion}): ` + + `error records=${JSON.stringify(redactedErrorRecords)} ` + + `count=${redactedErrorRecords.length}` ); // Refresh the list of records that had errors with every push, that way // this list doesn't grow unbounded and we keep the list of storage keys // fresh. - window.storage.put('storage-service-error-records', newRecordsWithErrors); + await window.storage.put( + 'storage-service-error-records', + newRecordsWithErrors + ); - if (conflictCount !== 0) { - log.info( - 'storageService.processRemoteRecords: ' + - `${conflictCount} conflicts found, uploading changes` - ); - - return conflictCount; + if (conflictCount === 0) { + conflictBackOff.reset(); } - conflictBackOff.reset(); + return conflictCount; } catch (err) { log.error( - 'storageService.processRemoteRecords: failed!', - err && err.stack ? err.stack : String(err) + `storageService.process(${storageVersion}): ` + + 'failed to process remote records', + Errors.toLogFormat(err) ); } @@ -1060,12 +1134,17 @@ async function sync( const localManifestVersion = manifestFromStorage || 0; - log.info(`storageService.sync: fetching ${localManifestVersion}`); + log.info( + 'storageService.sync: fetching latest ' + + `after version=${localManifestVersion}` + ); manifest = await fetchManifest(localManifestVersion); // Guarding against no manifests being returned, everything should be ok if (!manifest) { - log.info('storageService.sync: no new manifest'); + log.info( + `storageService.sync: no updates, version=${localManifestVersion}` + ); return undefined; } @@ -1076,25 +1155,30 @@ async function sync( const version = normalizeNumber(manifest.version); log.info( - `storageService.sync: manifest versions - previous: ${localManifestVersion}, current: ${version}` + `storageService.sync: updating to remoteVersion=${version} from ` + + `version=${localManifestVersion}` ); - const hasConflicts = await processManifest(manifest); + const conflictCount = await processManifest(manifest, version); - log.info(`storageService.sync: storing new manifest version ${version}`); + log.info( + `storageService.sync: updated to version=${version} ` + + `conflicts=${conflictCount}` + ); - window.storage.put('manifestVersion', version); + await window.storage.put('manifestVersion', version); + const hasConflicts = conflictCount !== 0; if (hasConflicts && !ignoreConflicts) { await upload(true); } // We now know that we've successfully completed a storage service fetch - window.storage.put('storageFetchComplete', true); + await window.storage.put('storageFetchComplete', true); } catch (err) { log.error( 'storageService.sync: error processing manifest', - err && err.stack ? err.stack : String(err) + Errors.toLogFormat(err) ); } @@ -1180,10 +1264,7 @@ async function upload(fromSync = false): Promise { setTimeout(runStorageServiceSyncJob); return; } - log.error( - 'storageService.upload', - err && err.stack ? err.stack : String(err) - ); + log.error('storageService.upload', Errors.toLogFormat(err)); } } diff --git a/ts/services/storageRecordOps.ts b/ts/services/storageRecordOps.ts index 163b9cb68..e9f0c08c0 100644 --- a/ts/services/storageRecordOps.ts +++ b/ts/services/storageRecordOps.ts @@ -50,6 +50,19 @@ type RecordClass = | Proto.IGroupV1Record | Proto.IGroupV2Record; +export type MergeResultType = Readonly<{ + hasConflict: boolean; + conversation?: ConversationModel; + oldStorageID?: string; + oldStorageVersion?: number; + details: ReadonlyArray; +}>; + +type HasConflictResultType = Readonly<{ + hasConflict: boolean; + details: ReadonlyArray; +}>; + function toRecordVerified(verified: number): Proto.ContactRecord.IdentityState { const VERIFIED_ENUM = window.textsecure.storage.protocol.VerifiedStatus; const STATE_ENUM = Proto.ContactRecord.IdentityState; @@ -66,13 +79,11 @@ function toRecordVerified(verified: number): Proto.ContactRecord.IdentityState { function addUnknownFields( record: RecordClass, - conversation: ConversationModel + conversation: ConversationModel, + details: Array ): void { if (record.__unknownFields) { - log.info( - 'storageService.addUnknownFields: Unknown fields found for', - conversation.idForLogging() - ); + details.push('adding unknown fields'); conversation.set({ storageUnknownFields: Bytes.toBase64( Bytes.concatenate(record.__unknownFields) @@ -81,10 +92,7 @@ function addUnknownFields( } else if (conversation.get('storageUnknownFields')) { // If the record doesn't have unknown fields attached but we have them // saved locally then we need to clear it out - log.info( - 'storageService.addUnknownFields: Clearing unknown fields for', - conversation.idForLogging() - ); + details.push('clearing unknown fields'); conversation.unset('storageUnknownFields'); } } @@ -97,7 +105,7 @@ function applyUnknownFields( if (storageUnknownFields) { log.info( 'storageService.applyUnknownFields: Applying unknown fields for', - conversation.get('id') + conversation.idForLogging() ); // eslint-disable-next-line no-param-reassign record.__unknownFields = [Bytes.fromBase64(storageUnknownFields)]; @@ -292,11 +300,6 @@ export async function toAccountRecord( pinnedConversationClass !== undefined ); - log.info( - 'storageService.toAccountRecord: pinnedConversations', - pinnedConversations.length - ); - accountRecord.pinnedConversations = pinnedConversations; const subscriberId = window.storage.get('subscriberId'); @@ -398,12 +401,11 @@ type RecordClassObject = { function doRecordsConflict( localRecord: RecordClassObject, - remoteRecord: RecordClassObject, - conversation: ConversationModel -): boolean { - const idForLogging = conversation.idForLogging(); + remoteRecord: RecordClassObject +): HasConflictResultType { + const details = new Array(); - return Object.keys(remoteRecord).some((key: string): boolean => { + for (const key of Object.keys(remoteRecord)) { const localValue = localRecord[key]; const remoteValue = remoteRecord[key]; @@ -412,52 +414,37 @@ function doRecordsConflict( if (localValue instanceof Uint8Array) { const areEqual = Bytes.areEqual(localValue, remoteValue); if (!areEqual) { - log.info( - 'storageService.doRecordsConflict: Conflict found for Uint8Array', - key, - idForLogging - ); + details.push(`key=${key}: different bytes`); } - return !areEqual; + continue; } // If both types are Long we can use Long's equals to compare them if (Long.isLong(localValue) || typeof localValue === 'number') { if (!Long.isLong(remoteValue) && typeof remoteValue !== 'number') { - log.info( - 'storageService.doRecordsConflict: Conflict found, remote value ' + - 'is not a number', - key - ); - return true; + details.push(`key=${key}: type mismatch`); + continue; } const areEqual = Long.fromValue(localValue).equals( Long.fromValue(remoteValue) ); if (!areEqual) { - log.info( - 'storageService.doRecordsConflict: Conflict found for Long', - key, - idForLogging - ); + details.push(`key=${key}: different integers`); } - return !areEqual; + continue; } if (key === 'pinnedConversations') { const areEqual = arePinnedConversationsEqual(localValue, remoteValue); if (!areEqual) { - log.info( - 'storageService.doRecordsConflict: Conflict found for pinnedConversations', - idForLogging - ); + details.push('pinnedConversations'); } - return !areEqual; + continue; } if (localValue === remoteValue) { - return false; + continue; } // Sometimes we get `null` values from Protobuf and they should default to @@ -470,56 +457,59 @@ function doRecordsConflict( localValue === 0 || (Long.isLong(localValue) && localValue.toNumber() === 0)) ) { - return false; + continue; } const areEqual = isEqual(localValue, remoteValue); if (!areEqual) { - log.info( - 'storageService.doRecordsConflict: Conflict found for', - key, - idForLogging - ); + details.push(`key=${key}: different values`); } + } - return !areEqual; - }); + return { + hasConflict: details.length > 0, + details, + }; } function doesRecordHavePendingChanges( mergedRecord: RecordClass, serviceRecord: RecordClass, conversation: ConversationModel -): boolean { +): HasConflictResultType { const shouldSync = Boolean(conversation.get('needsStorageServiceSync')); if (!shouldSync) { - return false; + return { hasConflict: false, details: [] }; } - const hasConflict = doRecordsConflict( + const { hasConflict, details } = doRecordsConflict( mergedRecord, - serviceRecord, - conversation + serviceRecord ); if (!hasConflict) { conversation.set({ needsStorageServiceSync: false }); } - return hasConflict; + return { + hasConflict, + details, + }; } export async function mergeGroupV1Record( storageID: string, + storageVersion: number, groupV1Record: Proto.IGroupV1Record -): Promise { +): Promise { if (!groupV1Record.id) { throw new Error(`No ID for ${storageID}`); } const groupId = Bytes.toBinary(groupV1Record.id); + let details = new Array(); // Attempt to fetch an existing group pertaining to the `groupId` or create // a new group and populate it with the attributes from the record. @@ -544,18 +534,13 @@ export async function mergeGroupV1Record( const fields = deriveGroupFields(masterKeyBuffer); const derivedGroupV2Id = Bytes.toBase64(fields.id); - log.info( - 'storageService.mergeGroupV1Record: failed to find group by v1 id ' + + details.push( + 'failed to find group by v1 id ' + `attempting lookup by v2 groupv2(${derivedGroupV2Id})` ); conversation = window.ConversationController.get(derivedGroupV2Id); } - if (conversation) { - log.info( - 'storageService.mergeGroupV1Record: found existing group', - conversation.idForLogging() - ); - } else { + if (!conversation) { if (groupV1Record.id.byteLength !== 16) { throw new Error('Not a valid gv1'); } @@ -564,16 +549,17 @@ export async function mergeGroupV1Record( groupId, 'group' ); - log.info( - 'storageService.mergeGroupV1Record: created a new group locally', - conversation.idForLogging() - ); + details.push('created a new group locally'); } + const oldStorageID = conversation.get('storageID'); + const oldStorageVersion = conversation.get('storageVersion'); + conversation.set({ isArchived: Boolean(groupV1Record.archived), markedUnread: Boolean(groupV1Record.markedUnread), storageID, + storageVersion, }); conversation.setMuteExpiration( @@ -588,30 +574,35 @@ export async function mergeGroupV1Record( let hasPendingChanges: boolean; if (isGroupV1(conversation.attributes)) { - addUnknownFields(groupV1Record, conversation); + addUnknownFields(groupV1Record, conversation, details); - hasPendingChanges = doesRecordHavePendingChanges( + const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges( await toGroupV1Record(conversation), groupV1Record, conversation ); + + details = details.concat(extraDetails); + hasPendingChanges = hasConflict; } else { // We cannot preserve unknown fields if local group is V2 and the remote is // still V1, because the storageItem that we'll put into manifest will have // a different record type. - log.info( - 'storageService.mergeGroupV1Record marking v1' + - ' group for an update to v2', - conversation.idForLogging() - ); // We want to upgrade group in the storage after merging it. hasPendingChanges = true; + details.push('marking v1 group for an update to v2'); } updateConversation(conversation.attributes); - return hasPendingChanges; + return { + hasConflict: hasPendingChanges, + conversation, + oldStorageID, + oldStorageVersion, + details, + }; } async function getGroupV2Conversation( @@ -663,8 +654,9 @@ async function getGroupV2Conversation( export async function mergeGroupV2Record( storageID: string, + storageVersion: number, groupV2Record: Proto.IGroupV2Record -): Promise { +): Promise { if (!groupV2Record.masterKey) { throw new Error(`No master key for ${storageID}`); } @@ -672,7 +664,8 @@ export async function mergeGroupV2Record( const masterKeyBuffer = groupV2Record.masterKey; const conversation = await getGroupV2Conversation(masterKeyBuffer); - log.info('storageService.mergeGroupV2Record:', conversation.idForLogging()); + const oldStorageID = conversation.get('storageID'); + const oldStorageVersion = conversation.get('storageVersion'); conversation.set({ isArchived: Boolean(groupV2Record.archived), @@ -681,6 +674,7 @@ export async function mergeGroupV2Record( groupV2Record.dontNotifyForMentionsIfMuted ), storageID, + storageVersion, }); conversation.setMuteExpiration( @@ -692,14 +686,18 @@ export async function mergeGroupV2Record( applyMessageRequestState(groupV2Record, conversation); - addUnknownFields(groupV2Record, conversation); + let details = new Array(); - const hasPendingChanges = doesRecordHavePendingChanges( + addUnknownFields(groupV2Record, conversation, details); + + const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges( await toGroupV2Record(conversation), groupV2Record, conversation ); + details = details.concat(extraDetails); + updateConversation(conversation.attributes); const isGroupNewToUs = !isNumber(conversation.get('revision')); @@ -731,13 +729,20 @@ export async function mergeGroupV2Record( ); } - return hasPendingChanges; + return { + hasConflict, + conversation, + oldStorageID, + oldStorageVersion, + details, + }; } export async function mergeContactRecord( storageID: string, + storageVersion: number, originalContactRecord: Proto.IContactRecord -): Promise { +): Promise { const contactRecord = { ...originalContactRecord, @@ -754,11 +759,11 @@ export async function mergeContactRecord( // All contacts must have UUID if (!uuid) { - return false; + return { hasConflict: false, details: ['no uuid'] }; } if (!isValidUuid(uuid)) { - return false; + return { hasConflict: false, details: ['invalid uuid'] }; } const c = new window.Whisper.Conversation({ @@ -769,11 +774,10 @@ export async function mergeContactRecord( const validationError = c.validate(); if (validationError) { - log.error( - 'storageService.mergeContactRecord: invalid contact', - validationError - ); - return false; + return { + hasConflict: false, + details: [`validation error=${validationError}`], + }; } const id = window.ConversationController.ensureContactIds({ @@ -792,8 +796,6 @@ export async function mergeContactRecord( 'private' ); - log.info('storageService.mergeContactRecord:', conversation.idForLogging()); - if (contactRecord.profileKey) { await conversation.setProfileKey(Bytes.toBase64(contactRecord.profileKey), { viaStorageServiceSync: true, @@ -823,12 +825,17 @@ export async function mergeContactRecord( applyMessageRequestState(contactRecord, conversation); - addUnknownFields(contactRecord, conversation); + let details = new Array(); + addUnknownFields(contactRecord, conversation, details); + + const oldStorageID = conversation.get('storageID'); + const oldStorageVersion = conversation.get('storageVersion'); conversation.set({ isArchived: Boolean(contactRecord.archived), markedUnread: Boolean(contactRecord.markedUnread), storageID, + storageVersion, }); conversation.setMuteExpiration( @@ -838,21 +845,30 @@ export async function mergeContactRecord( } ); - const hasPendingChanges = doesRecordHavePendingChanges( + const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges( await toContactRecord(conversation), contactRecord, conversation ); + details = details.concat(extraDetails); updateConversation(conversation.attributes); - return hasPendingChanges; + return { + hasConflict, + conversation, + oldStorageID, + oldStorageVersion, + details, + }; } export async function mergeAccountRecord( storageID: string, + storageVersion: number, accountRecord: Proto.IAccountRecord -): Promise { +): Promise { + let details = new Array(); const { avatarUrl, linkPreviews, @@ -911,7 +927,7 @@ export async function mergeAccountRecord( const localPreferredReactionEmoji = window.storage.get('preferredReactionEmoji') || []; if (!isEqual(localPreferredReactionEmoji, rawPreferredReactionEmoji)) { - log.info( + log.warn( 'storageService: remote and local preferredReactionEmoji do not match', localPreferredReactionEmoji.length, rawPreferredReactionEmoji.length @@ -970,7 +986,7 @@ export async function mergeAccountRecord( .filter(id => !modelPinnedConversationIds.includes(id)); if (missingStoragePinnedConversationIds.length !== 0) { - log.info( + log.warn( 'mergeAccountRecord: pinnedConversationIds in storage does not match pinned Conversation models' ); } @@ -986,13 +1002,9 @@ export async function mergeAccountRecord( ) ); - log.info( - 'storageService.mergeAccountRecord: Local pinned', - locallyPinnedConversations.length - ); - log.info( - 'storageService.mergeAccountRecord: Remote pinned', - pinnedConversations.length + details.push( + `local pinned=${locallyPinnedConversations.length}`, + `remote pinned=${pinnedConversations.length}` ); const remotelyPinnedConversationPromises = pinnedConversations.map( @@ -1041,14 +1053,9 @@ export async function mergeAccountRecord( ({ id }) => !remotelyPinnedConversationIds.includes(id) ); - log.info( - 'storageService.mergeAccountRecord: unpinning', - conversationsToUnpin.length - ); - - log.info( - 'storageService.mergeAccountRecord: pinning', - remotelyPinnedConversations.length + details.push( + `unpinning=${conversationsToUnpin.length}`, + `pinning=${remotelyPinnedConversations.length}` ); conversationsToUnpin.forEach(conversation => { @@ -1083,12 +1090,16 @@ export async function mergeAccountRecord( 'private' ); - addUnknownFields(accountRecord, conversation); + addUnknownFields(accountRecord, conversation, details); + + const oldStorageID = conversation.get('storageID'); + const oldStorageVersion = conversation.get('storageVersion'); conversation.set({ isArchived: Boolean(noteToSelfArchived), markedUnread: Boolean(noteToSelfMarkedUnread), storageID, + storageVersion, }); if (accountRecord.profileKey) { @@ -1100,7 +1111,7 @@ export async function mergeAccountRecord( window.storage.put('avatarUrl', avatarUrl); } - const hasPendingChanges = doesRecordHavePendingChanges( + const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges( await toAccountRecord(conversation), accountRecord, conversation @@ -1108,5 +1119,13 @@ export async function mergeAccountRecord( updateConversation(conversation.attributes); - return hasPendingChanges; + details = details.concat(extraDetails); + + return { + hasConflict, + conversation, + oldStorageID, + oldStorageVersion, + details, + }; }