From 17e6ec468e663e5b454777264bd841715b6ab36d Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Wed, 9 Jun 2021 15:28:54 -0700 Subject: [PATCH] Faster WebSocket reconnects --- js/views/inbox_view.js | 10 +- libtextsecure/test/fake_web_api.js | 7 +- libtextsecure/test/index.html | 1 - .../test/websocket-resources_test.js | 237 ---------------- main.js | 9 + preload.js | 9 + ts/background.ts | 45 ++- ts/components/NetworkStatus.stories.tsx | 19 +- ts/components/NetworkStatus.tsx | 7 +- ts/main/powerChannel.ts | 26 ++ ts/services/groupCredentialFetcher.ts | 24 +- ts/services/storage.ts | 66 ++--- ts/shims/socketStatus.ts | 6 +- ts/state/ducks/network.ts | 2 +- ts/state/selectors/network.ts | 8 +- ts/test-both/util/BackOff_test.ts | 45 +++ ts/test-electron/WebsocketResources_test.ts | 264 ++++++++++++++++++ ts/textsecure/AccountManager.ts | 196 ++++++------- ts/textsecure/MessageReceiver.ts | 127 +++++---- ts/textsecure/WebAPI.ts | 202 +++++++++----- ts/textsecure/WebSocket.ts | 22 -- ts/textsecure/WebsocketResources.ts | 241 ++++++++-------- ts/types/SocketStatus.ts | 8 +- ts/util/BackOff.ts | 33 +++ ts/window.d.ts | 3 +- 25 files changed, 940 insertions(+), 677 deletions(-) delete mode 100644 libtextsecure/test/websocket-resources_test.js create mode 100644 ts/main/powerChannel.ts create mode 100644 ts/test-both/util/BackOff_test.ts create mode 100644 ts/test-electron/WebsocketResources_test.ts delete mode 100644 ts/textsecure/WebSocket.ts create mode 100644 ts/util/BackOff.ts diff --git a/js/views/inbox_view.js b/js/views/inbox_view.js index e405bb051..ac652f598 100644 --- a/js/views/inbox_view.js +++ b/js/views/inbox_view.js @@ -168,15 +168,15 @@ this.interval = setInterval(() => { const status = window.getSocketStatus(); switch (status) { - case WebSocket.CONNECTING: + case 'CONNECTING': break; - case WebSocket.OPEN: + case 'OPEN': clearInterval(this.interval); // if we've connected, we can wait for real empty event this.interval = null; break; - case WebSocket.CLOSING: - case WebSocket.CLOSED: + case 'CLOSING': + case 'CLOSED': clearInterval(this.interval); this.interval = null; // if we failed to connect, we pretend we got an empty event @@ -184,7 +184,7 @@ break; default: window.log.warn( - 'startConnectionListener: Found unexpected socket status; calling onEmpty() manually.' + `startConnectionListener: Found unexpected socket status ${status}; calling onEmpty() manually.` ); this.onEmpty(); break; diff --git a/libtextsecure/test/fake_web_api.js b/libtextsecure/test/fake_web_api.js index 3c9864ca8..0d4b20270 100644 --- a/libtextsecure/test/fake_web_api.js +++ b/libtextsecure/test/fake_web_api.js @@ -14,7 +14,12 @@ const fakeAPI = { getAvatar: fakeCall, getDevices: fakeCall, // getKeysForIdentifier : fakeCall, - getMessageSocket: () => new window.MockSocket('ws://localhost:8081/'), + getMessageSocket: async () => ({ + on() {}, + removeListener() {}, + close() {}, + sendBytes() {}, + }), getMyKeys: fakeCall, getProfile: fakeCall, getProvisioningSocket: fakeCall, diff --git a/libtextsecure/test/index.html b/libtextsecure/test/index.html index 1c6c01df3..9c59b3f2b 100644 --- a/libtextsecure/test/index.html +++ b/libtextsecure/test/index.html @@ -37,7 +37,6 @@ - diff --git a/libtextsecure/test/websocket-resources_test.js b/libtextsecure/test/websocket-resources_test.js deleted file mode 100644 index a2fad95f6..000000000 --- a/libtextsecure/test/websocket-resources_test.js +++ /dev/null @@ -1,237 +0,0 @@ -// Copyright 2015-2020 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -describe('WebSocket-Resource', () => { - describe('requests and responses', () => { - it('receives requests and sends responses', done => { - // mock socket - const requestId = '1'; - const socket = { - send(data) { - const message = window.textsecure.protobuf.WebSocketMessage.decode( - data - ); - assert.strictEqual( - message.type, - window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE - ); - assert.strictEqual(message.response.message, 'OK'); - assert.strictEqual(message.response.status, 200); - assert.strictEqual(message.response.id.toString(), requestId); - done(); - }, - addEventListener() {}, - }; - - // actual test - this.resource = new window.textsecure.WebSocketResource(socket, { - handleRequest(request) { - assert.strictEqual(request.verb, 'PUT'); - assert.strictEqual(request.path, '/some/path'); - assertEqualArrayBuffers( - request.body.toArrayBuffer(), - window.Signal.Crypto.typedArrayToArrayBuffer( - new Uint8Array([1, 2, 3]) - ) - ); - request.respond(200, 'OK'); - }, - }); - - // mock socket request - socket.onmessage({ - data: new Blob([ - new window.textsecure.protobuf.WebSocketMessage({ - type: window.textsecure.protobuf.WebSocketMessage.Type.REQUEST, - request: { - id: requestId, - verb: 'PUT', - path: '/some/path', - body: window.Signal.Crypto.typedArrayToArrayBuffer( - new Uint8Array([1, 2, 3]) - ), - }, - }) - .encode() - .toArrayBuffer(), - ]), - }); - }); - - it('sends requests and receives responses', done => { - // mock socket and request handler - let requestId; - const socket = { - send(data) { - const message = window.textsecure.protobuf.WebSocketMessage.decode( - data - ); - assert.strictEqual( - message.type, - window.textsecure.protobuf.WebSocketMessage.Type.REQUEST - ); - assert.strictEqual(message.request.verb, 'PUT'); - assert.strictEqual(message.request.path, '/some/path'); - assertEqualArrayBuffers( - message.request.body.toArrayBuffer(), - window.Signal.Crypto.typedArrayToArrayBuffer( - new Uint8Array([1, 2, 3]) - ) - ); - requestId = message.request.id; - }, - addEventListener() {}, - }; - - // actual test - const resource = new window.textsecure.WebSocketResource(socket); - resource.sendRequest({ - verb: 'PUT', - path: '/some/path', - body: window.Signal.Crypto.typedArrayToArrayBuffer( - new Uint8Array([1, 2, 3]) - ), - error: done, - success(message, status) { - assert.strictEqual(message, 'OK'); - assert.strictEqual(status, 200); - done(); - }, - }); - - // mock socket response - socket.onmessage({ - data: new Blob([ - new window.textsecure.protobuf.WebSocketMessage({ - type: window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE, - response: { id: requestId, message: 'OK', status: 200 }, - }) - .encode() - .toArrayBuffer(), - ]), - }); - }); - }); - - describe('close', () => { - before(() => { - window.WebSocket = MockSocket; - }); - after(() => { - window.WebSocket = WebSocket; - }); - it('closes the connection', done => { - const mockServer = new MockServer('ws://localhost:8081'); - mockServer.on('connection', server => { - server.on('close', done); - }); - const resource = new window.textsecure.WebSocketResource( - new WebSocket('ws://localhost:8081') - ); - resource.close(); - }); - }); - - describe.skip('with a keepalive config', function thisNeeded() { - before(() => { - window.WebSocket = MockSocket; - }); - after(() => { - window.WebSocket = WebSocket; - }); - this.timeout(60000); - it('sends keepalives once a minute', done => { - const mockServer = new MockServer('ws://localhost:8081'); - mockServer.on('connection', server => { - server.on('message', data => { - const message = window.textsecure.protobuf.WebSocketMessage.decode( - data - ); - assert.strictEqual( - message.type, - window.textsecure.protobuf.WebSocketMessage.Type.REQUEST - ); - assert.strictEqual(message.request.verb, 'GET'); - assert.strictEqual(message.request.path, '/v1/keepalive'); - server.close(); - done(); - }); - }); - this.resource = new window.textsecure.WebSocketResource( - new WebSocket('ws://loc1alhost:8081'), - { - keepalive: { path: '/v1/keepalive' }, - } - ); - }); - - it('uses / as a default path', done => { - const mockServer = new MockServer('ws://localhost:8081'); - mockServer.on('connection', server => { - server.on('message', data => { - const message = window.textsecure.protobuf.WebSocketMessage.decode( - data - ); - assert.strictEqual( - message.type, - window.textsecure.protobuf.WebSocketMessage.Type.REQUEST - ); - assert.strictEqual(message.request.verb, 'GET'); - assert.strictEqual(message.request.path, '/'); - server.close(); - done(); - }); - }); - this.resource = new window.textsecure.WebSocketResource( - new WebSocket('ws://localhost:8081'), - { - keepalive: true, - } - ); - }); - - it('optionally disconnects if no response', function thisNeeded1(done) { - this.timeout(65000); - const mockServer = new MockServer('ws://localhost:8081'); - const socket = new WebSocket('ws://localhost:8081'); - mockServer.on('connection', server => { - server.on('close', done); - }); - this.resource = new window.textsecure.WebSocketResource(socket, { - keepalive: true, - }); - }); - - it('allows resetting the keepalive timer', function thisNeeded2(done) { - this.timeout(65000); - const mockServer = new MockServer('ws://localhost:8081'); - const socket = new WebSocket('ws://localhost:8081'); - const startTime = Date.now(); - mockServer.on('connection', server => { - server.on('message', data => { - const message = window.textsecure.protobuf.WebSocketMessage.decode( - data - ); - assert.strictEqual( - message.type, - window.textsecure.protobuf.WebSocketMessage.Type.REQUEST - ); - assert.strictEqual(message.request.verb, 'GET'); - assert.strictEqual(message.request.path, '/'); - assert( - Date.now() > startTime + 60000, - 'keepalive time should be longer than a minute' - ); - server.close(); - done(); - }); - }); - const resource = new window.textsecure.WebSocketResource(socket, { - keepalive: true, - }); - setTimeout(() => { - resource.resetKeepAliveTimer(); - }, 5000); - }); - }); -}); diff --git a/main.js b/main.js index 3ed4d53e6..236c75930 100644 --- a/main.js +++ b/main.js @@ -123,6 +123,7 @@ const { } = require('./ts/types/Settings'); const { Environment } = require('./ts/environment'); const { ChallengeMainHandler } = require('./ts/main/challengeMain'); +const { PowerChannel } = require('./ts/main/powerChannel'); const { maybeParseUrl, setUrlSearchParams } = require('./ts/util/url'); const sql = new MainSQL(); @@ -1265,6 +1266,14 @@ app.on('ready', async () => { cleanupOrphanedAttachments, }); sqlChannels.initialize(sql); + PowerChannel.initialize({ + send(event) { + if (!mainWindow) { + return; + } + mainWindow.webContents.send(event); + }, + }); // Run window preloading in parallel with database initialization. await createWindow(); diff --git a/preload.js b/preload.js index 5ee63856d..7debc1baf 100644 --- a/preload.js +++ b/preload.js @@ -179,6 +179,15 @@ try { ipc.on('challenge:response', (_event, response) => { Whisper.events.trigger('challengeResponse', response); }); + + ipc.on('power-channel:suspend', () => { + Whisper.events.trigger('powerMonitorSuspend'); + }); + + ipc.on('power-channel:resume', () => { + Whisper.events.trigger('powerMonitorResume'); + }); + window.sendChallengeRequest = request => ipc.send('challenge:request', request); diff --git a/ts/background.ts b/ts/background.ts index b74591c73..f2b0ab0e7 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -11,6 +11,7 @@ import { DataMessageClass } from './textsecure.d'; import { MessageAttributesType } from './model-types.d'; import { WhatIsThis } from './window.d'; import { getTitleBarVisibility, TitleBarVisibility } from './types/Settings'; +import { SocketStatus } from './types/SocketStatus'; import { DEFAULT_CONVERSATION_COLOR } from './types/Colors'; import { ChallengeHandler } from './challenge'; import { isWindowDragElement } from './util/isWindowDragElement'; @@ -38,6 +39,7 @@ import { connectToServerWithStoredCredentials } from './util/connectToServerWith import * as universalExpireTimer from './util/universalExpireTimer'; import { isDirectConversation, isGroupV2 } from './util/whatTypeOfConversation'; import { getSendOptions } from './util/getSendOptions'; +import { BackOff } from './util/BackOff'; const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000; @@ -96,6 +98,15 @@ export async function startApp(): Promise { resolveOnAppView = resolve; }); + // Fibonacci timeouts + const reconnectBackOff = new BackOff([ + 5 * 1000, + 10 * 1000, + 15 * 1000, + 25 * 1000, + 40 * 1000, + ]); + window.textsecure.protobuf.onLoad(() => { window.storage.onready(() => { senderCertificateService.initialize({ @@ -302,15 +313,15 @@ export async function startApp(): Promise { }); let messageReceiver: WhatIsThis; - let preMessageReceiverStatus: WhatIsThis; + let preMessageReceiverStatus: SocketStatus | undefined; window.getSocketStatus = () => { if (messageReceiver) { return messageReceiver.getStatus(); } - if (window._.isNumber(preMessageReceiverStatus)) { + if (preMessageReceiverStatus) { return preMessageReceiverStatus; } - return WebSocket.CLOSED; + return SocketStatus.CLOSED; }; window.Whisper.events = window._.clone(window.Backbone.Events); let accountManager: typeof window.textsecure.AccountManager; @@ -1549,6 +1560,19 @@ export async function startApp(): Promise { } }); + window.Whisper.events.on('powerMonitorSuspend', () => { + window.log.info('powerMonitor: suspend'); + }); + + window.Whisper.events.on('powerMonitorResume', () => { + window.log.info('powerMonitor: resume'); + if (!messageReceiver) { + return; + } + + messageReceiver.checkSocket(); + }); + const reconnectToWebSocketQueue = new LatestQueue(); const enqueueReconnectToWebSocket = () => { @@ -1884,7 +1908,8 @@ export async function startApp(): Promise { function isSocketOnline() { const socketStatus = window.getSocketStatus(); return ( - socketStatus === WebSocket.CONNECTING || socketStatus === WebSocket.OPEN + socketStatus === SocketStatus.CONNECTING || + socketStatus === SocketStatus.OPEN ); } @@ -1937,7 +1962,7 @@ export async function startApp(): Promise { return; } - preMessageReceiverStatus = WebSocket.CONNECTING; + preMessageReceiverStatus = SocketStatus.CONNECTING; if (messageReceiver) { await messageReceiver.stopProcessing(); @@ -2020,7 +2045,7 @@ export async function startApp(): Promise { window.Signal.Services.initializeGroupCredentialFetcher(); - preMessageReceiverStatus = null; + preMessageReceiverStatus = undefined; // eslint-disable-next-line no-inner-declarations function addQueuedEventListener(name: string, handler: WhatIsThis) { @@ -2258,6 +2283,8 @@ export async function startApp(): Promise { // Intentionally not awaiting challengeHandler.onOnline(); + + reconnectBackOff.reset(); } finally { connecting = false; } @@ -3380,8 +3407,10 @@ export async function startApp(): Promise { ) { // Failed to connect to server if (navigator.onLine) { - window.log.info('retrying in 1 minute'); - reconnectTimer = setTimeout(connect, 60000); + const timeout = reconnectBackOff.getAndIncrement(); + + window.log.info(`retrying in ${timeout}ms`); + reconnectTimer = setTimeout(connect, timeout); window.Whisper.events.trigger('reconnectTimer'); diff --git a/ts/components/NetworkStatus.stories.tsx b/ts/components/NetworkStatus.stories.tsx index c219cef06..e8e0ed71f 100644 --- a/ts/components/NetworkStatus.stories.tsx +++ b/ts/components/NetworkStatus.stories.tsx @@ -7,6 +7,7 @@ import { boolean, select } from '@storybook/addon-knobs'; import { action } from '@storybook/addon-actions'; import { NetworkStatus } from './NetworkStatus'; +import { SocketStatus } from '../types/SocketStatus'; import { setup as setupI18n } from '../../js/modules/i18n'; import enMessages from '../../_locales/en/messages.json'; @@ -16,7 +17,7 @@ const defaultProps = { hasNetworkDialog: true, i18n, isOnline: true, - socketStatus: 0, + socketStatus: SocketStatus.CONNECTING, manualReconnect: action('manual-reconnect'), withinConnectingGracePeriod: false, challengeStatus: 'idle' as const, @@ -26,19 +27,19 @@ const permutations = [ { title: 'Connecting', props: { - socketStatus: 0, + socketStatus: SocketStatus.CONNECTING, }, }, { title: 'Closing (online)', props: { - socketStatus: 2, + socketStatus: SocketStatus.CLOSING, }, }, { title: 'Closed (online)', props: { - socketStatus: 3, + socketStatus: SocketStatus.CLOSED, }, }, { @@ -56,12 +57,12 @@ storiesOf('Components/NetworkStatus', module) const socketStatus = select( 'socketStatus', { - CONNECTING: 0, - OPEN: 1, - CLOSING: 2, - CLOSED: 3, + CONNECTING: SocketStatus.CONNECTING, + OPEN: SocketStatus.OPEN, + CLOSING: SocketStatus.CLOSING, + CLOSED: SocketStatus.CLOSED, }, - 0 + SocketStatus.CONNECTING ); return ( diff --git a/ts/components/NetworkStatus.tsx b/ts/components/NetworkStatus.tsx index a0813a092..eee1e3808 100644 --- a/ts/components/NetworkStatus.tsx +++ b/ts/components/NetworkStatus.tsx @@ -4,6 +4,7 @@ import React from 'react'; import { LocalizerType } from '../types/Util'; +import { SocketStatus } from '../types/SocketStatus'; import { NetworkStateType } from '../state/ducks/network'; const FIVE_SECONDS = 5 * 1000; @@ -100,12 +101,12 @@ export const NetworkStatus = ({ let renderActionableButton; switch (socketStatus) { - case WebSocket.CONNECTING: + case SocketStatus.CONNECTING: subtext = i18n('connectingHangOn'); title = i18n('connecting'); break; - case WebSocket.CLOSED: - case WebSocket.CLOSING: + case SocketStatus.CLOSED: + case SocketStatus.CLOSING: default: renderActionableButton = manualReconnectButton; title = i18n('disconnected'); diff --git a/ts/main/powerChannel.ts b/ts/main/powerChannel.ts new file mode 100644 index 000000000..773190755 --- /dev/null +++ b/ts/main/powerChannel.ts @@ -0,0 +1,26 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { powerMonitor } from 'electron'; + +export type InitializeOptions = { + send(event: string): void; +}; + +export class PowerChannel { + private static isInitialized = false; + + static initialize({ send }: InitializeOptions): void { + if (PowerChannel.isInitialized) { + throw new Error('PowerChannel already initialized'); + } + PowerChannel.isInitialized = true; + + powerMonitor.on('suspend', () => { + send('power-channel:suspend'); + }); + powerMonitor.on('resume', () => { + send('power-channel:resume'); + }); + } +} diff --git a/ts/services/groupCredentialFetcher.ts b/ts/services/groupCredentialFetcher.ts index 185ab082c..8e2e6572d 100644 --- a/ts/services/groupCredentialFetcher.ts +++ b/ts/services/groupCredentialFetcher.ts @@ -11,6 +11,7 @@ import { } from '../util/zkgroup'; import { GroupCredentialType } from '../textsecure/WebAPI'; +import { BackOff } from '../util/BackOff'; import { sleep } from '../util/sleep'; export const GROUP_CREDENTIALS_KEY = 'groupCredentials'; @@ -50,33 +51,28 @@ export async function initializeGroupCredentialFetcher(): Promise { await runWithRetry(maybeFetchNewCredentials, { scheduleAnother: 4 * HOUR }); } -type BackoffType = { - [key: number]: number | undefined; - max: number; -}; -const BACKOFF: BackoffType = { - 0: SECOND, - 1: 5 * SECOND, - 2: 30 * SECOND, - 3: 2 * MINUTE, - max: 5 * MINUTE, -}; +const BACKOFF_TIMEOUTS = [ + SECOND, + 5 * SECOND, + 30 * SECOND, + 2 * MINUTE, + 5 * MINUTE, +]; export async function runWithRetry( fn: () => Promise, options: { scheduleAnother?: number } = {} ): Promise { - let count = 0; + const backOff = new BackOff(BACKOFF_TIMEOUTS); // eslint-disable-next-line no-constant-condition while (true) { try { - count += 1; // eslint-disable-next-line no-await-in-loop await fn(); return; } catch (error) { - const wait = BACKOFF[count] || BACKOFF.max; + const wait = backOff.getAndIncrement(); window.log.info( `runWithRetry: ${fn.name} failed. Waiting ${wait}ms for retry. Error: ${error.stack}` ); diff --git a/ts/services/storage.ts b/ts/services/storage.ts index 472635004..e8d456c96 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -30,6 +30,7 @@ import { toGroupV2Record, } from './storageRecordOps'; import { ConversationModel } from '../models/conversations'; +import { BackOff } from '../util/BackOff'; import { storageJobQueue } from '../util/JobQueue'; import { sleep } from '../util/sleep'; import { isMoreRecentThan } from '../util/timestamp'; @@ -45,8 +46,6 @@ const { updateConversation, } = dataInterface; -let consecutiveStops = 0; -let consecutiveConflicts = 0; const uploadBucket: Array = []; const validRecordTypes = new Set([ @@ -57,24 +56,18 @@ const validRecordTypes = new Set([ 4, // ACCOUNT ]); -type BackoffType = { - [key: number]: number | undefined; - max: number; -}; const SECOND = 1000; const MINUTE = 60 * SECOND; -const BACKOFF: BackoffType = { - 0: SECOND, - 1: 5 * SECOND, - 2: 30 * SECOND, - 3: 2 * MINUTE, - max: 5 * MINUTE, -}; -function backOff(count: number) { - const ms = BACKOFF[count] || BACKOFF.max; - return sleep(ms); -} +const backOff = new BackOff([ + SECOND, + 5 * SECOND, + 30 * SECOND, + 2 * MINUTE, + 5 * MINUTE, +]); + +const conflictBackOff = new BackOff([SECOND, 5 * SECOND, 30 * SECOND]); function redactStorageID(storageID: string): string { return storageID.substring(0, 3); @@ -494,16 +487,15 @@ async function uploadManifest( ); if (err.code === 409) { - if (consecutiveConflicts > 3) { + if (conflictBackOff.isFull()) { window.log.error( 'storageService.uploadManifest: Exceeded maximum consecutive conflicts' ); return; } - consecutiveConflicts += 1; window.log.info( - `storageService.uploadManifest: Conflict found with v${version}, running sync job times(${consecutiveConflicts})` + `storageService.uploadManifest: Conflict found with v${version}, running sync job times(${conflictBackOff.getIndex()})` ); throw err; @@ -517,8 +509,8 @@ async function uploadManifest( version ); window.storage.put('manifestVersion', version); - consecutiveConflicts = 0; - consecutiveStops = 0; + conflictBackOff.reset(); + backOff.reset(); await window.textsecure.messaging.sendFetchManifestSyncMessage(); } @@ -527,21 +519,21 @@ async function stopStorageServiceSync() { await window.storage.remove('storageKey'); - if (consecutiveStops < 5) { - await backOff(consecutiveStops); + if (backOff.isFull()) { window.log.info( - 'storageService.stopStorageServiceSync: requesting new keys' + 'storageService.stopStorageServiceSync: too many consecutive stops' ); - consecutiveStops += 1; - setTimeout(() => { - if (!window.textsecure.messaging) { - throw new Error( - 'storageService.stopStorageServiceSync: We are offline!' - ); - } - window.textsecure.messaging.sendRequestKeySyncMessage(); - }); + return; } + + await sleep(backOff.getAndIncrement()); + window.log.info('storageService.stopStorageServiceSync: requesting new keys'); + setTimeout(() => { + if (!window.textsecure.messaging) { + throw new Error('storageService.stopStorageServiceSync: We are offline!'); + } + window.textsecure.messaging.sendRequestKeySyncMessage(); + }); } async function createNewManifest() { @@ -976,7 +968,7 @@ async function processRemoteRecords( return conflictCount; } - consecutiveConflicts = 0; + conflictBackOff.reset(); } catch (err) { window.log.error( 'storageService.processRemoteRecords: failed!', @@ -1082,7 +1074,7 @@ async function upload(fromSync = false): Promise { window.log.info( 'storageService.upload: no storageKey, requesting new keys' ); - consecutiveStops = 0; + backOff.reset(); await window.textsecure.messaging.sendRequestKeySyncMessage(); return; } @@ -1108,7 +1100,7 @@ async function upload(fromSync = false): Promise { await uploadManifest(version, generatedManifest); } catch (err) { if (err.code === 409) { - await backOff(consecutiveConflicts); + await sleep(conflictBackOff.getAndIncrement()); window.log.info('storageService.upload: pushing sync on the queue'); // The sync job will check for conflicts and as part of that conflict // check if an item needs sync and doesn't match with the remote record diff --git a/ts/shims/socketStatus.ts b/ts/shims/socketStatus.ts index 05424ea2c..55bb23dcc 100644 --- a/ts/shims/socketStatus.ts +++ b/ts/shims/socketStatus.ts @@ -1,7 +1,9 @@ -// Copyright 2020 Signal Messenger, LLC +// Copyright 2020-2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -export function getSocketStatus(): number { +import { SocketStatus } from '../types/SocketStatus'; + +export function getSocketStatus(): SocketStatus { const { getSocketStatus: getMessageReceiverStatus } = window; return getMessageReceiverStatus(); diff --git a/ts/state/ducks/network.ts b/ts/state/ducks/network.ts index e8d340b5f..66d2f8ba8 100644 --- a/ts/state/ducks/network.ts +++ b/ts/state/ducks/network.ts @@ -98,7 +98,7 @@ export const actions = { export function getEmptyState(): NetworkStateType { return { isOnline: navigator.onLine, - socketStatus: WebSocket.OPEN, + socketStatus: SocketStatus.OPEN, withinConnectingGracePeriod: true, challengeStatus: 'idle', }; diff --git a/ts/state/selectors/network.ts b/ts/state/selectors/network.ts index e594ef3d1..f39795408 100644 --- a/ts/state/selectors/network.ts +++ b/ts/state/selectors/network.ts @@ -6,6 +6,7 @@ import { createSelector } from 'reselect'; import { StateType } from '../reducer'; import { NetworkStateType } from '../ducks/network'; import { isDone } from '../../util/registration'; +import { SocketStatus } from '../../types/SocketStatus'; const getNetwork = (state: StateType): NetworkStateType => state.network; @@ -18,9 +19,10 @@ export const hasNetworkDialog = createSelector( ): boolean => isRegistrationDone && (!isOnline || - (socketStatus === WebSocket.CONNECTING && !withinConnectingGracePeriod) || - socketStatus === WebSocket.CLOSED || - socketStatus === WebSocket.CLOSING) + (socketStatus === SocketStatus.CONNECTING && + !withinConnectingGracePeriod) || + socketStatus === SocketStatus.CLOSED || + socketStatus === SocketStatus.CLOSING) ); export const isChallengePending = createSelector( diff --git a/ts/test-both/util/BackOff_test.ts b/ts/test-both/util/BackOff_test.ts new file mode 100644 index 000000000..3213791e4 --- /dev/null +++ b/ts/test-both/util/BackOff_test.ts @@ -0,0 +1,45 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { BackOff } from '../../util/BackOff'; + +describe('BackOff', () => { + it('should return increasing timeouts', () => { + const b = new BackOff([1, 2, 3]); + + assert.strictEqual(b.getIndex(), 0); + assert.strictEqual(b.isFull(), false); + + assert.strictEqual(b.get(), 1); + assert.strictEqual(b.getAndIncrement(), 1); + assert.strictEqual(b.get(), 2); + assert.strictEqual(b.getIndex(), 1); + assert.strictEqual(b.isFull(), false); + + assert.strictEqual(b.getAndIncrement(), 2); + assert.strictEqual(b.getIndex(), 2); + assert.strictEqual(b.isFull(), true); + + assert.strictEqual(b.getAndIncrement(), 3); + assert.strictEqual(b.getIndex(), 2); + assert.strictEqual(b.isFull(), true); + + assert.strictEqual(b.getAndIncrement(), 3); + assert.strictEqual(b.getIndex(), 2); + assert.strictEqual(b.isFull(), true); + }); + + it('should reset', () => { + const b = new BackOff([1, 2, 3]); + + assert.strictEqual(b.getAndIncrement(), 1); + assert.strictEqual(b.getAndIncrement(), 2); + + b.reset(); + + assert.strictEqual(b.getAndIncrement(), 1); + assert.strictEqual(b.getAndIncrement(), 2); + }); +}); diff --git a/ts/test-electron/WebsocketResources_test.ts b/ts/test-electron/WebsocketResources_test.ts new file mode 100644 index 000000000..00c6aadd5 --- /dev/null +++ b/ts/test-electron/WebsocketResources_test.ts @@ -0,0 +1,264 @@ +// Copyright 2015-2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable + class-methods-use-this, + no-new, + @typescript-eslint/no-empty-function, + @typescript-eslint/no-explicit-any + */ + +import { assert } from 'chai'; +import * as sinon from 'sinon'; +import EventEmitter from 'events'; +import { connection as WebSocket } from 'websocket'; + +import WebSocketResource from '../textsecure/WebsocketResources'; + +describe('WebSocket-Resource', () => { + class FakeSocket extends EventEmitter { + public sendBytes(_: Uint8Array) {} + + public close() {} + } + + describe('requests and responses', () => { + it('receives requests and sends responses', done => { + // mock socket + const requestId = '1'; + const socket = new FakeSocket(); + + sinon.stub(socket, 'sendBytes').callsFake((data: Uint8Array) => { + const message = window.textsecure.protobuf.WebSocketMessage.decode( + data + ); + assert.strictEqual( + message.type, + window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE + ); + assert.strictEqual(message.response?.message, 'OK'); + assert.strictEqual(message.response?.status, 200); + assert.strictEqual(message.response?.id.toString(), requestId); + done(); + }); + + // actual test + new WebSocketResource(socket as WebSocket, { + handleRequest(request: any) { + assert.strictEqual(request.verb, 'PUT'); + assert.strictEqual(request.path, '/some/path'); + assert.ok( + window.Signal.Crypto.constantTimeEqual( + request.body.toArrayBuffer(), + window.Signal.Crypto.typedArrayToArrayBuffer( + new Uint8Array([1, 2, 3]) + ) + ) + ); + request.respond(200, 'OK'); + }, + }); + + // mock socket request + socket.emit('message', { + type: 'binary', + binaryData: new Uint8Array( + new window.textsecure.protobuf.WebSocketMessage({ + type: window.textsecure.protobuf.WebSocketMessage.Type.REQUEST, + request: { + id: requestId, + verb: 'PUT', + path: '/some/path', + body: window.Signal.Crypto.typedArrayToArrayBuffer( + new Uint8Array([1, 2, 3]) + ), + }, + }) + .encode() + .toArrayBuffer() + ), + }); + }); + + it('sends requests and receives responses', done => { + // mock socket and request handler + let requestId: Long | undefined; + const socket = new FakeSocket(); + + sinon.stub(socket, 'sendBytes').callsFake((data: Uint8Array) => { + const message = window.textsecure.protobuf.WebSocketMessage.decode( + data + ); + assert.strictEqual( + message.type, + window.textsecure.protobuf.WebSocketMessage.Type.REQUEST + ); + assert.strictEqual(message.request?.verb, 'PUT'); + assert.strictEqual(message.request?.path, '/some/path'); + assert.ok( + window.Signal.Crypto.constantTimeEqual( + message.request?.body.toArrayBuffer(), + window.Signal.Crypto.typedArrayToArrayBuffer( + new Uint8Array([1, 2, 3]) + ) + ) + ); + requestId = message.request?.id; + }); + + // actual test + const resource = new WebSocketResource(socket as WebSocket); + resource.sendRequest({ + verb: 'PUT', + path: '/some/path', + body: window.Signal.Crypto.typedArrayToArrayBuffer( + new Uint8Array([1, 2, 3]) + ), + error: done, + success(message: string, status: number) { + assert.strictEqual(message, 'OK'); + assert.strictEqual(status, 200); + done(); + }, + }); + + // mock socket response + socket.emit('message', { + type: 'binary', + binaryData: new Uint8Array( + new window.textsecure.protobuf.WebSocketMessage({ + type: window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE, + response: { id: requestId, message: 'OK', status: 200 }, + }) + .encode() + .toArrayBuffer() + ), + }); + }); + }); + + describe('close', () => { + it('closes the connection', done => { + const socket = new FakeSocket(); + + sinon.stub(socket, 'close').callsFake(() => done()); + + const resource = new WebSocketResource(socket as WebSocket); + resource.close(); + }); + }); + + describe('with a keepalive config', () => { + const NOW = Date.now(); + + beforeEach(function beforeEach() { + this.sandbox = sinon.createSandbox(); + this.clock = this.sandbox.useFakeTimers({ + now: NOW, + }); + }); + + afterEach(function afterEach() { + this.sandbox.restore(); + }); + + it('sends keepalives once a minute', function test(done) { + const socket = new FakeSocket(); + + sinon.stub(socket, 'sendBytes').callsFake(data => { + const message = window.textsecure.protobuf.WebSocketMessage.decode( + data + ); + assert.strictEqual( + message.type, + window.textsecure.protobuf.WebSocketMessage.Type.REQUEST + ); + assert.strictEqual(message.request?.verb, 'GET'); + assert.strictEqual(message.request?.path, '/v1/keepalive'); + done(); + }); + + new WebSocketResource(socket as WebSocket, { + keepalive: { path: '/v1/keepalive' }, + }); + + this.clock.next(); + }); + + it('uses / as a default path', function test(done) { + const socket = new FakeSocket(); + + sinon.stub(socket, 'sendBytes').callsFake(data => { + const message = window.textsecure.protobuf.WebSocketMessage.decode( + data + ); + assert.strictEqual( + message.type, + window.textsecure.protobuf.WebSocketMessage.Type.REQUEST + ); + assert.strictEqual(message.request?.verb, 'GET'); + assert.strictEqual(message.request?.path, '/'); + done(); + }); + + new WebSocketResource(socket as WebSocket, { + keepalive: true, + }); + + this.clock.next(); + }); + + it('optionally disconnects if no response', function thisNeeded1(done) { + const socket = new FakeSocket(); + + sinon.stub(socket, 'close').callsFake(() => done()); + + new WebSocketResource(socket as WebSocket, { + keepalive: true, + }); + + // One to trigger send + this.clock.next(); + + // Another to trigger send timeout + this.clock.next(); + }); + + it('allows resetting the keepalive timer', function thisNeeded2(done) { + const startTime = Date.now(); + + const socket = new FakeSocket(); + + sinon.stub(socket, 'sendBytes').callsFake(data => { + const message = window.textsecure.protobuf.WebSocketMessage.decode( + data + ); + assert.strictEqual( + message.type, + window.textsecure.protobuf.WebSocketMessage.Type.REQUEST + ); + assert.strictEqual(message.request?.verb, 'GET'); + assert.strictEqual(message.request?.path, '/'); + assert.strictEqual( + Date.now(), + startTime + 60000, + 'keepalive time should be one minute' + ); + done(); + }); + + const resource = new WebSocketResource(socket as WebSocket, { + keepalive: true, + }); + + setTimeout(() => { + resource.keepalive?.reset(); + }, 5000); + + // Trigger setTimeout above + this.clock.next(); + + // Trigger sendBytes + this.clock.next(); + }); + }); +}); diff --git a/ts/textsecure/AccountManager.ts b/ts/textsecure/AccountManager.ts index 4c5f7c46b..14814525f 100644 --- a/ts/textsecure/AccountManager.ts +++ b/ts/textsecure/AccountManager.ts @@ -199,109 +199,111 @@ export default class AccountManager extends EventTarget { const queueTask = this.queueTask.bind(this); const provisioningCipher = new ProvisioningCipher(); let gotProvisionEnvelope = false; - return provisioningCipher.getPublicKey().then( - async (pubKey: ArrayBuffer) => - new Promise((resolve, reject) => { - const socket = getSocket(); - socket.onclose = event => { - window.log.info('provisioning socket closed. Code:', event.code); - if (!gotProvisionEnvelope) { - reject(new Error('websocket closed')); + const pubKey = await provisioningCipher.getPublicKey(); + + const socket = await getSocket(); + + window.log.info('provisioning socket open'); + + return new Promise((resolve, reject) => { + socket.on('close', (code, reason) => { + window.log.info( + `provisioning socket closed. Code: ${code} Reason: ${reason}` + ); + if (!gotProvisionEnvelope) { + reject(new Error('websocket closed')); + } + }); + + const wsr = new WebSocketResource(socket, { + keepalive: { path: '/v1/keepalive/provisioning' }, + handleRequest(request: IncomingWebSocketRequest) { + if ( + request.path === '/v1/address' && + request.verb === 'PUT' && + request.body + ) { + const proto = window.textsecure.protobuf.ProvisioningUuid.decode( + request.body + ); + const { uuid } = proto; + if (!uuid) { + throw new Error('registerSecondDevice: expected a UUID'); } - }; - socket.onopen = () => { - window.log.info('provisioning socket open'); - }; - const wsr = new WebSocketResource(socket, { - keepalive: { path: '/v1/keepalive/provisioning' }, - handleRequest(request: IncomingWebSocketRequest) { - if ( - request.path === '/v1/address' && - request.verb === 'PUT' && - request.body - ) { - const proto = window.textsecure.protobuf.ProvisioningUuid.decode( - request.body - ); - const { uuid } = proto; - if (!uuid) { - throw new Error('registerSecondDevice: expected a UUID'); - } - const url = getProvisioningUrl(uuid, pubKey); + const url = getProvisioningUrl(uuid, pubKey); - if (window.CI) { - window.CI.setProvisioningURL(url); - } + if (window.CI) { + window.CI.setProvisioningURL(url); + } - setProvisioningUrl(url); - request.respond(200, 'OK'); - } else if ( - request.path === '/v1/message' && - request.verb === 'PUT' && - request.body - ) { - const envelope = window.textsecure.protobuf.ProvisionEnvelope.decode( - request.body, - 'binary' - ); - request.respond(200, 'OK'); - gotProvisionEnvelope = true; - wsr.close(); - resolve( - provisioningCipher - .decrypt(envelope) - .then(async provisionMessage => - queueTask(async () => - confirmNumber(provisionMessage.number).then( - async deviceName => { - if ( - typeof deviceName !== 'string' || - deviceName.length === 0 - ) { - throw new Error( - 'AccountManager.registerSecondDevice: Invalid device name' - ); - } - if ( - !provisionMessage.number || - !provisionMessage.provisioningCode || - !provisionMessage.identityKeyPair - ) { - throw new Error( - 'AccountManager.registerSecondDevice: Provision message was missing key data' - ); - } + setProvisioningUrl(url); + request.respond(200, 'OK'); + } else if ( + request.path === '/v1/message' && + request.verb === 'PUT' && + request.body + ) { + const envelope = window.textsecure.protobuf.ProvisionEnvelope.decode( + request.body, + 'binary' + ); + request.respond(200, 'OK'); + gotProvisionEnvelope = true; + wsr.close(); + resolve( + provisioningCipher + .decrypt(envelope) + .then(async provisionMessage => + queueTask(async () => + confirmNumber(provisionMessage.number).then( + async deviceName => { + if ( + typeof deviceName !== 'string' || + deviceName.length === 0 + ) { + throw new Error( + 'AccountManager.registerSecondDevice: Invalid device name' + ); + } + if ( + !provisionMessage.number || + !provisionMessage.provisioningCode || + !provisionMessage.identityKeyPair + ) { + throw new Error( + 'AccountManager.registerSecondDevice: Provision message was missing key data' + ); + } - return createAccount( - provisionMessage.number, - provisionMessage.provisioningCode, - provisionMessage.identityKeyPair, - provisionMessage.profileKey, - deviceName, - provisionMessage.userAgent, - provisionMessage.readReceipts, - { uuid: provisionMessage.uuid } - ) - .then(clearSessionsAndPreKeys) - .then(generateKeys) - .then(async (keys: GeneratedKeysType) => - registerKeys(keys).then(async () => - confirmKeys(keys) - ) - ) - .then(registrationDone); - } + return createAccount( + provisionMessage.number, + provisionMessage.provisioningCode, + provisionMessage.identityKeyPair, + provisionMessage.profileKey, + deviceName, + provisionMessage.userAgent, + provisionMessage.readReceipts, + { uuid: provisionMessage.uuid } ) - ) + .then(clearSessionsAndPreKeys) + .then(generateKeys) + .then(async (keys: GeneratedKeysType) => + registerKeys(keys).then(async () => + confirmKeys(keys) + ) + ) + .then(registrationDone); + } ) - ); - } else { - window.log.error('Unknown websocket message', request.path); - } - }, - }); - }) - ); + ) + ) + ); + } else { + window.log.error('Unknown websocket message', request.path); + } + }, + }); + }); } async refreshPreKeys() { diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 5e3a0e787..071c98d89 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -10,9 +10,10 @@ /* eslint-disable max-classes-per-file */ /* eslint-disable no-restricted-syntax */ -import { isNumber, map, omit, noop } from 'lodash'; +import { isNumber, map, omit } from 'lodash'; import PQueue from 'p-queue'; import { v4 as getGuid } from 'uuid'; +import { connection as WebSocket } from 'websocket'; import { z } from 'zod'; import { @@ -41,6 +42,7 @@ import { SignedPreKeys, } from '../LibSignalStores'; import { BatcherType, createBatcher } from '../util/batcher'; +import { sleep } from '../util/sleep'; import { parseIntOrThrow } from '../util/parseIntOrThrow'; import { Zone } from '../util/Zone'; import EventTarget from './EventTarget'; @@ -53,6 +55,7 @@ import Crypto from './Crypto'; import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto'; import { ContactBuffer, GroupBuffer } from './ContactsParser'; import { isByteBufferEmpty } from '../util/isByteBufferEmpty'; +import { SocketStatus } from '../types/SocketStatus'; import { AttachmentPointerClass, @@ -68,13 +71,12 @@ import { } from '../textsecure.d'; import { ByteBufferClass } from '../window.d'; -import { WebSocket } from './WebSocket'; - import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups'; const GROUPV1_ID_LENGTH = 16; const GROUPV2_ID_LENGTH = 32; const RETRY_TIMEOUT = 2 * 60 * 1000; +const RECONNECT_DELAY = 1 * 1000; const decryptionErrorTypeSchema = z .object({ @@ -169,7 +171,9 @@ enum TaskType { } class MessageReceiverInner extends EventTarget { - _onClose?: (ev: any) => Promise; + _onClose?: (code: number, reason: string) => Promise; + + _onError?: (error: Error) => Promise; appQueue: PQueue; @@ -185,7 +189,7 @@ class MessageReceiverInner extends EventTarget { deviceId?: number; - hasConnected?: boolean; + hasConnected = false; incomingQueue: PQueue; @@ -209,6 +213,8 @@ class MessageReceiverInner extends EventTarget { socket?: WebSocket; + socketStatus = SocketStatus.CLOSED; + stoppingProcessing?: boolean; username: string; @@ -304,7 +310,7 @@ class MessageReceiverInner extends EventTarget { static arrayBufferToStringBase64 = (arrayBuffer: ArrayBuffer): string => window.dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64'); - connect() { + async connect(): Promise { if (this.calledClose) { return; } @@ -322,17 +328,43 @@ class MessageReceiverInner extends EventTarget { this.hasConnected = true; - if (this.socket && this.socket.readyState !== WebSocket.CLOSED) { + if (this.socket && this.socket.connected) { this.socket.close(); + this.socket = undefined; if (this.wsr) { this.wsr.close(); + this.wsr = undefined; } } + this.socketStatus = SocketStatus.CONNECTING; + // initialize the socket and start listening for messages - this.socket = this.server.getMessageSocket(); - this.socket.onclose = this.onclose.bind(this); - this.socket.onerror = this.onerror.bind(this); - this.socket.onopen = this.onopen.bind(this); + try { + this.socket = await this.server.getMessageSocket(); + } catch (error) { + this.socketStatus = SocketStatus.CLOSED; + + const event = new Event('error'); + event.error = error; + await this.dispatchAndWait(event); + return; + } + + this.socketStatus = SocketStatus.OPEN; + + window.log.info('websocket open'); + window.logMessageReceiverConnect(); + + if (!this._onClose) { + this._onClose = this.onclose.bind(this); + } + if (!this._onError) { + this._onError = this.onerror.bind(this); + } + + this.socket.on('close', this._onClose); + this.socket.on('error', this._onError); + this.wsr = new WebSocketResource(this.socket, { handleRequest: this.handleRequest.bind(this), keepalive: { @@ -342,7 +374,6 @@ class MessageReceiverInner extends EventTarget { }); // Because sometimes the socket doesn't properly emit its close event - this._onClose = this.onclose.bind(this); if (this._onClose) { this.wsr.addEventListener('close', this._onClose); } @@ -362,9 +393,12 @@ class MessageReceiverInner extends EventTarget { shutdown() { if (this.socket) { - this.socket.onclose = noop; - this.socket.onerror = noop; - this.socket.onopen = noop; + if (this._onClose) { + this.socket.removeListener('close', this._onClose); + } + if (this._onError) { + this.socket.removeListener('error', this._onError); + } this.socket = undefined; } @@ -380,6 +414,7 @@ class MessageReceiverInner extends EventTarget { async close() { window.log.info('MessageReceiver.close()'); this.calledClose = true; + this.socketStatus = SocketStatus.CLOSING; // Our WebSocketResource instance will close the socket and emit a 'close' event // if the socket doesn't emit one quickly enough. @@ -392,13 +427,8 @@ class MessageReceiverInner extends EventTarget { return this.drain(); } - onopen() { - window.log.info('websocket open'); - window.logMessageReceiverConnect(); - } - - onerror() { - window.log.error('websocket error'); + async onerror(error: Error): Promise { + window.log.error('websocket error', error); } async dispatchAndWait(event: Event) { @@ -407,35 +437,41 @@ class MessageReceiverInner extends EventTarget { return Promise.resolve(); } - async onclose(ev: any) { + async onclose(code: number, reason: string): Promise { window.log.info( 'websocket closed', - ev.code, - ev.reason || '', + code, + reason || '', 'calledClose:', this.calledClose ); + this.socketStatus = SocketStatus.CLOSED; + this.shutdown(); if (this.calledClose) { - return Promise.resolve(); + return; } - if (ev.code === 3000) { - return Promise.resolve(); + if (code === 3000) { + return; } - if (ev.code === 3001) { + if (code === 3001) { this.onEmpty(); } - // possible 403 or network issue. Make an request to confirm - return this.server - .getDevices() - .then(this.connect.bind(this)) // No HTTP error? Reconnect - .catch(async e => { - const event = new Event('error'); - event.error = e; - return this.dispatchAndWait(event); - }); + + await sleep(RECONNECT_DELAY); + + // Try to reconnect (if there is an error - we'll get an + // `error` event from `connect()` and hit the retry backoff logic in + // `ts/background.ts`) + await this.connect(); + } + + checkSocket(): void { + if (this.wsr) { + this.wsr.forceKeepAlive(); + } } handleRequest(request: IncomingWebSocketRequest) { @@ -1076,14 +1112,8 @@ class MessageReceiverInner extends EventTarget { throw new Error('Received message with no content and no legacyMessage'); } - getStatus() { - if (this.socket) { - return this.socket.readyState; - } - if (this.hasConnected) { - return WebSocket.CLOSED; - } - return -1; + getStatus(): SocketStatus { + return this.socketStatus; } async onDeliveryReceipt(envelope: EnvelopeClass): Promise { @@ -2693,6 +2723,7 @@ export default class MessageReceiver { this.hasEmptied = inner.hasEmptied.bind(inner); this.removeEventListener = inner.removeEventListener.bind(inner); this.stopProcessing = inner.stopProcessing.bind(inner); + this.checkSocket = inner.checkSocket.bind(inner); this.unregisterBatchers = inner.unregisterBatchers.bind(inner); inner.connect(); @@ -2707,7 +2738,7 @@ export default class MessageReceiver { attachment: AttachmentPointerClass ) => Promise; - getStatus: () => number; + getStatus: () => SocketStatus; hasEmptied: () => boolean; @@ -2717,6 +2748,8 @@ export default class MessageReceiver { unregisterBatchers: () => void; + checkSocket: () => void; + getProcessedCount: () => number; static stringToArrayBuffer = MessageReceiverInner.stringToArrayBuffer; diff --git a/ts/textsecure/WebAPI.ts b/ts/textsecure/WebAPI.ts index 6201e2eaf..5fdb6f456 100644 --- a/ts/textsecure/WebAPI.ts +++ b/ts/textsecure/WebAPI.ts @@ -11,7 +11,7 @@ import fetch, { Response } from 'node-fetch'; import ProxyAgent from 'proxy-agent'; -import { Agent } from 'https'; +import { Agent, RequestOptions } from 'https'; import pProps from 'p-props'; import { compact, @@ -25,9 +25,11 @@ import { pki } from 'node-forge'; import is from '@sindresorhus/is'; import PQueue from 'p-queue'; import { v4 as getGuid } from 'uuid'; +import { client as WebSocketClient, connection as WebSocket } from 'websocket'; import { z } from 'zod'; import { Long } from '../window.d'; +import { assert } from '../util/assert'; import { getUserAgent } from '../util/getUserAgent'; import { toWebSafeBase64 } from '../util/webSafeBase64'; import { isPackIdValid, redactPackId } from '../../js/modules/stickers'; @@ -59,7 +61,6 @@ import { StorageServiceCredentials, } from '../textsecure.d'; -import { WebSocket } from './WebSocket'; import MessageSender from './SendMessage'; // Note: this will break some code that expects to be able to use err.response when a @@ -261,31 +262,85 @@ function _validateResponse(response: any, schema: any) { return true; } -function _createSocket( +export type ConnectSocketOptions = Readonly<{ + certificateAuthority: string; + proxyUrl?: string; + version: string; + timeout?: number; +}>; + +const TEN_SECONDS = 1000 * 10; + +async function _connectSocket( url: string, { certificateAuthority, proxyUrl, version, - }: { certificateAuthority: string; proxyUrl?: string; version: string } -) { - let requestOptions; + timeout = TEN_SECONDS, + }: ConnectSocketOptions +): Promise { + let tlsOptions: RequestOptions = { + ca: certificateAuthority, + }; if (proxyUrl) { - requestOptions = { - ca: certificateAuthority, + tlsOptions = { + ...tlsOptions, agent: new ProxyAgent(proxyUrl), }; - } else { - requestOptions = { - ca: certificateAuthority, - }; } + const headers = { 'User-Agent': getUserAgent(version), }; - return new WebSocket(url, undefined, undefined, headers, requestOptions, { + const client = new WebSocketClient({ + tlsOptions, maxReceivedFrameSize: 0x210000, }); + + client.connect(url, undefined, undefined, headers); + + const { stack } = new Error(); + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error('Connection timed out')); + + client.abort(); + }, timeout); + + client.on('connect', socket => { + clearTimeout(timer); + resolve(socket); + }); + + client.on('httpResponse', async response => { + clearTimeout(timer); + + const statusCode = response.statusCode || -1; + await _handleStatusCode(statusCode); + + const error = makeHTTPError( + 'promiseAjax: invalid websocket response', + statusCode || -1, + {}, // headers + undefined, + stack + ); + + const translatedError = _translateError(error); + assert( + translatedError, + '`httpResponse` event cannot be emitted with 200 status code' + ); + + reject(translatedError); + }); + client.on('connectFailed', error => { + clearTimeout(timer); + reject(error); + }); + }); } const FIVE_MINUTES = 1000 * 60 * 5; @@ -403,6 +458,56 @@ function getHostname(url: string): string { return urlObject.hostname; } +async function _handleStatusCode( + status: number, + unauthenticated = false +): Promise { + if (status === 499) { + window.log.error('Got 499 from Signal Server. Build is expired.'); + await window.storage.put('remoteBuildExpiration', Date.now()); + window.reduxActions.expiration.hydrateExpirationStatus(true); + } + if (!unauthenticated && status === 401) { + window.log.error('Got 401 from Signal Server. We might be unlinked.'); + window.Whisper.events.trigger('mightBeUnlinked'); + } +} + +function _translateError(error: Error): Error | undefined { + const { code } = error; + if (code === 200) { + // Happens sometimes when we get no response. Might be nice to get 204 instead. + return undefined; + } + let message: string; + switch (code) { + case -1: + message = + 'Failed to connect to the server, please check your network connection.'; + break; + case 413: + message = 'Rate limit exceeded, please try again later.'; + break; + case 403: + message = 'Invalid code, please try again.'; + break; + case 417: + message = 'Number already registered.'; + break; + case 401: + message = + 'Invalid authentication, most likely someone re-registered and invalidated our registration.'; + break; + case 404: + message = 'Number is not registered.'; + break; + default: + message = 'The server rejected our query, please file a bug report.'; + } + error.message = `${message} (original: ${error.message})`; + return error; +} + async function _promiseAjax( providedUrl: string | null, options: PromiseAjaxOptionsType @@ -487,25 +592,11 @@ async function _promiseAjax( fetch(url, fetchOptions) .then(async response => { - if (options.serverUrl) { - if ( - response.status === 499 && - getHostname(options.serverUrl) === getHostname(url) - ) { - window.log.error('Got 499 from Signal Server. Build is expired.'); - await window.storage.put('remoteBuildExpiration', Date.now()); - window.reduxActions.expiration.hydrateExpirationStatus(true); - } - if ( - !unauthenticated && - response.status === 401 && - getHostname(options.serverUrl) === getHostname(url) - ) { - window.log.error( - 'Got 401 from Signal Server. We might be unlinked.' - ); - window.Whisper.events.trigger('mightBeUnlinked'); - } + if ( + options.serverUrl && + getHostname(options.serverUrl) === getHostname(url) + ) { + await _handleStatusCode(response.status, unauthenticated); } let resultPromise; @@ -863,7 +954,7 @@ export type WebAPIType = { deviceId?: number, options?: { accessKey?: string } ) => Promise; - getMessageSocket: () => WebSocket; + getMessageSocket: () => Promise; getMyKeys: () => Promise; getProfile: ( identifier: string, @@ -880,7 +971,7 @@ export type WebAPIType = { profileKeyCredentialRequest?: string; } ) => Promise; - getProvisioningSocket: () => WebSocket; + getProvisioningSocket: () => Promise; getSenderCertificate: ( withUuid?: boolean ) => Promise<{ certificate: string }>; @@ -1153,39 +1244,10 @@ export function initialize({ unauthenticated: param.unauthenticated, accessKey: param.accessKey, }).catch((e: Error) => { - const { code } = e; - if (code === 200) { - // Happens sometimes when we get no response. Might be nice to get 204 instead. - return null; + const translatedError = _translateError(e); + if (translatedError) { + throw translatedError; } - let message: string; - switch (code) { - case -1: - message = - 'Failed to connect to the server, please check your network connection.'; - break; - case 413: - message = 'Rate limit exceeded, please try again later.'; - break; - case 403: - message = 'Invalid code, please try again.'; - break; - case 417: - message = 'Number already registered.'; - break; - case 401: - message = - 'Invalid authentication, most likely someone re-registered and invalidated our registration.'; - break; - case 404: - message = 'Number is not registered.'; - break; - default: - message = - 'The server rejected our query, please file a bug report.'; - } - e.message = `${message} (original: ${e.message})`; - throw e; }); } @@ -2318,7 +2380,7 @@ export function initialize({ }; } - function getMessageSocket() { + function getMessageSocket(): Promise { window.log.info('opening message socket', url); const fixedScheme = url .replace('https://', 'wss://') @@ -2327,20 +2389,20 @@ export function initialize({ const pass = encodeURIComponent(password); const clientVersion = encodeURIComponent(version); - return _createSocket( + return _connectSocket( `${fixedScheme}/v1/websocket/?login=${login}&password=${pass}&agent=OWD&version=${clientVersion}`, { certificateAuthority, proxyUrl, version } ); } - function getProvisioningSocket() { + function getProvisioningSocket(): Promise { window.log.info('opening provisioning socket', url); const fixedScheme = url .replace('https://', 'wss://') .replace('http://', 'ws://'); const clientVersion = encodeURIComponent(version); - return _createSocket( + return _connectSocket( `${fixedScheme}/v1/websocket/provisioning/?agent=OWD&version=${clientVersion}`, { certificateAuthority, proxyUrl, version } ); diff --git a/ts/textsecure/WebSocket.ts b/ts/textsecure/WebSocket.ts deleted file mode 100644 index 80f733da4..000000000 --- a/ts/textsecure/WebSocket.ts +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2020 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { w3cwebsocket } from 'websocket'; - -type ModifiedEventSource = Omit; - -declare class ModifiedWebSocket - extends w3cwebsocket - implements ModifiedEventSource { - withCredentials: boolean; - - addEventListener: EventSource['addEventListener']; - - removeEventListener: EventSource['removeEventListener']; - - dispatchEvent: EventSource['dispatchEvent']; -} - -export type WebSocket = ModifiedWebSocket; -// eslint-disable-next-line @typescript-eslint/no-redeclare -export const WebSocket = w3cwebsocket as typeof ModifiedWebSocket; diff --git a/ts/textsecure/WebsocketResources.ts b/ts/textsecure/WebsocketResources.ts index 918381dd0..273ac08c1 100644 --- a/ts/textsecure/WebsocketResources.ts +++ b/ts/textsecure/WebsocketResources.ts @@ -27,12 +27,12 @@ * */ +import { connection as WebSocket, IMessage } from 'websocket'; + import { ByteBufferClass } from '../window.d'; import EventTarget from './EventTarget'; -import { WebSocket } from './WebSocket'; - class Request { verb: string; @@ -92,14 +92,13 @@ export class IncomingWebSocketRequest { this.headers = request.headers; this.respond = (status, message) => { - socket.send( - new window.textsecure.protobuf.WebSocketMessage({ - type: window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE, - response: { id: request.id, message, status }, - }) - .encode() - .toArrayBuffer() - ); + const ab = new window.textsecure.protobuf.WebSocketMessage({ + type: window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE, + response: { id: request.id, message, status }, + }) + .encode() + .toArrayBuffer(); + socket.sendBytes(Buffer.from(ab)); }; } } @@ -111,20 +110,19 @@ class OutgoingWebSocketRequest { constructor(options: any, socket: WebSocket) { const request = new Request(options); outgoing[request.id] = request; - socket.send( - new window.textsecure.protobuf.WebSocketMessage({ - type: window.textsecure.protobuf.WebSocketMessage.Type.REQUEST, - request: { - verb: request.verb, - path: request.path, - body: request.body, - headers: request.headers, - id: request.id, - }, - }) - .encode() - .toArrayBuffer() - ); + const ab = new window.textsecure.protobuf.WebSocketMessage({ + type: window.textsecure.protobuf.WebSocketMessage.Type.REQUEST, + request: { + verb: request.verb, + path: request.path, + body: request.body, + headers: request.headers, + id: request.id, + }, + }) + .encode() + .toArrayBuffer(); + socket.sendBytes(Buffer.from(ab)); } } @@ -149,66 +147,58 @@ export default class WebSocketResource extends EventTarget { this.sendRequest = options => new OutgoingWebSocketRequest(options, socket); // eslint-disable-next-line no-param-reassign - socket.onmessage = socketMessage => { - const blob = socketMessage.data; - const handleArrayBuffer = (buffer: ArrayBuffer) => { - const message = window.textsecure.protobuf.WebSocketMessage.decode( - buffer + const onMessage = ({ type, binaryData }: IMessage): void => { + if (type !== 'binary' || !binaryData) { + throw new Error(`Unsupported websocket message type: ${type}`); + } + + const message = window.textsecure.protobuf.WebSocketMessage.decode( + binaryData + ); + if ( + message.type === + window.textsecure.protobuf.WebSocketMessage.Type.REQUEST && + message.request + ) { + handleRequest( + new IncomingWebSocketRequest({ + verb: message.request.verb, + path: message.request.path, + body: message.request.body, + headers: message.request.headers, + id: message.request.id, + socket, + }) ); - if ( - message.type === - window.textsecure.protobuf.WebSocketMessage.Type.REQUEST && - message.request - ) { - handleRequest( - new IncomingWebSocketRequest({ - verb: message.request.verb, - path: message.request.path, - body: message.request.body, - headers: message.request.headers, - id: message.request.id, - socket, - }) - ); - } else if ( - message.type === - window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE && - message.response - ) { - const { response } = message; - const request = outgoing[response.id]; - if (request) { - request.response = response; - let callback = request.error; - if ( - response.status && - response.status >= 200 && - response.status < 300 - ) { - callback = request.success; - } - - if (typeof callback === 'function') { - callback(response.message, response.status, request); - } - } else { - throw new Error( - `Received response for unknown request ${message.response.id}` - ); + } else if ( + message.type === + window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE && + message.response + ) { + const { response } = message; + const request = outgoing[response.id]; + if (request) { + request.response = response; + let callback = request.error; + if ( + response.status && + response.status >= 200 && + response.status < 300 + ) { + callback = request.success; } - } - }; - if (blob instanceof ArrayBuffer) { - handleArrayBuffer(blob); - } else { - const reader = new FileReader(); - reader.onload = () => { - handleArrayBuffer(reader.result as ArrayBuffer); - }; - reader.readAsArrayBuffer(blob as any); + if (typeof callback === 'function') { + callback(response.message, response.status, request); + } + } else { + throw new Error( + `Received response for unknown request ${message.response.id}` + ); + } } }; + socket.on('message', onMessage); if (opts.keepalive) { this.keepalive = new KeepAlive(this, { @@ -217,15 +207,13 @@ export default class WebSocketResource extends EventTarget { }); const resetKeepAliveTimer = this.keepalive.reset.bind(this.keepalive); - socket.addEventListener('open', resetKeepAliveTimer); - socket.addEventListener('message', resetKeepAliveTimer); - socket.addEventListener( - 'close', - this.keepalive.stop.bind(this.keepalive) - ); + this.keepalive.reset(); + + socket.on('message', resetKeepAliveTimer); + socket.on('close', this.keepalive.stop.bind(this.keepalive)); } - socket.addEventListener('close', () => { + socket.on('close', () => { this.closed = true; }); @@ -242,7 +230,7 @@ export default class WebSocketResource extends EventTarget { socket.close(code, reason); // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore - socket.onmessage = undefined; + socket.removeListener('message', onMessage); // On linux the socket can wait a long time to emit its close event if we've // lost the internet connection. On the order of minutes. This speeds that @@ -261,6 +249,13 @@ export default class WebSocketResource extends EventTarget { }, 5000); }; } + + public forceKeepAlive(): void { + if (!this.keepalive) { + return; + } + this.keepalive.send(); + } } type KeepAliveOptionsType = { @@ -269,15 +264,15 @@ type KeepAliveOptionsType = { }; class KeepAlive { - keepAliveTimer: any; + private keepAliveTimer: NodeJS.Timeout | undefined; - disconnectTimer: any; + private disconnectTimer: NodeJS.Timeout | undefined; - path: string; + private path: string; - disconnect: boolean; + private disconnect: boolean; - wsr: WebSocketResource; + private wsr: WebSocketResource; constructor( websocketResource: WebSocketResource, @@ -292,30 +287,46 @@ class KeepAlive { } } - stop() { - clearTimeout(this.keepAliveTimer); - clearTimeout(this.disconnectTimer); + public stop(): void { + this.clearTimers(); } - reset() { - clearTimeout(this.keepAliveTimer); - clearTimeout(this.disconnectTimer); - this.keepAliveTimer = setTimeout(() => { - if (this.disconnect) { - // automatically disconnect if server doesn't ack - this.disconnectTimer = setTimeout(() => { - clearTimeout(this.keepAliveTimer); - this.wsr.close(3001, 'No response to keepalive request'); - }, 10000); - } else { - this.reset(); - } - window.log.info('Sending a keepalive message'); - this.wsr.sendRequest({ - verb: 'GET', - path: this.path, - success: this.reset.bind(this), - }); - }, 55000); + public send(): void { + this.clearTimers(); + + if (this.disconnect) { + // automatically disconnect if server doesn't ack + this.disconnectTimer = setTimeout(() => { + this.clearTimers(); + + this.wsr.close(3001, 'No response to keepalive request'); + }, 10000); + } else { + this.reset(); + } + + window.log.info('WebSocketResources: Sending a keepalive message'); + this.wsr.sendRequest({ + verb: 'GET', + path: this.path, + success: this.reset.bind(this), + }); + } + + public reset(): void { + this.clearTimers(); + + this.keepAliveTimer = setTimeout(() => this.send(), 55000); + } + + private clearTimers(): void { + if (this.keepAliveTimer) { + clearTimeout(this.keepAliveTimer); + this.keepAliveTimer = undefined; + } + if (this.disconnectTimer) { + clearTimeout(this.disconnectTimer); + this.disconnectTimer = undefined; + } } } diff --git a/ts/types/SocketStatus.ts b/ts/types/SocketStatus.ts index d014fa538..90407a1ed 100644 --- a/ts/types/SocketStatus.ts +++ b/ts/types/SocketStatus.ts @@ -4,8 +4,8 @@ // Maps to values found here: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState // which are returned by libtextsecure's MessageReceiver export enum SocketStatus { - CONNECTING, - OPEN, - CLOSING, - CLOSED, + CONNECTING = 'CONNECTING', + OPEN = 'OPEN', + CLOSING = 'CLOSING', + CLOSED = 'CLOSED', } diff --git a/ts/util/BackOff.ts b/ts/util/BackOff.ts new file mode 100644 index 000000000..8e4d83bff --- /dev/null +++ b/ts/util/BackOff.ts @@ -0,0 +1,33 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export class BackOff { + private count = 0; + + constructor(private readonly timeouts: ReadonlyArray) {} + + public get(): number { + return this.timeouts[this.count]; + } + + public getAndIncrement(): number { + const result = this.get(); + if (!this.isFull()) { + this.count += 1; + } + + return result; + } + + public reset(): void { + this.count = 0; + } + + public isFull(): boolean { + return this.count === this.timeouts.length - 1; + } + + public getIndex(): number { + return this.count; + } +} diff --git a/ts/window.d.ts b/ts/window.d.ts index 885523bb6..4a73de684 100644 --- a/ts/window.d.ts +++ b/ts/window.d.ts @@ -108,6 +108,7 @@ import { ElectronLocaleType } from './util/mapToSupportLocale'; import { SignalProtocolStore } from './SignalProtocolStore'; import { StartupQueue } from './util/StartupQueue'; import * as synchronousCrypto from './util/synchronousCrypto'; +import { SocketStatus } from './types/SocketStatus'; import SyncRequest from './textsecure/SyncRequest'; import { ConversationColorType, CustomColorType } from './types/Colors'; @@ -190,7 +191,7 @@ declare global { getNodeVersion: () => string; getServerPublicParams: () => string; getSfuUrl: () => string; - getSocketStatus: () => number; + getSocketStatus: () => SocketStatus; getSyncRequest: (timeoutMillis?: number) => SyncRequest; getTitle: () => string; waitForEmptyEventQueue: () => Promise;