Cache messages on receipt, remove from cache when processed

FREEBIE
This commit is contained in:
Scott Nonnenberg 2017-07-17 15:46:00 -07:00
parent e6859a3684
commit bd0050b6c6
13 changed files with 683 additions and 118 deletions

View File

@ -47,6 +47,7 @@ module.exports = function(grunt) {
'libtextsecure/storage.js',
'libtextsecure/storage/user.js',
'libtextsecure/storage/groups.js',
'libtextsecure/storage/unprocessed.js',
'libtextsecure/protobufs.js',
'libtextsecure/websocket-resources.js',
'libtextsecure/helpers.js',

View File

@ -162,7 +162,7 @@
return;
}
ConversationController.create(c).save();
ConversationController.create(c).save().then(ev.confirm);
}
function onGroupReceived(ev) {
@ -179,14 +179,24 @@
} else {
attributes.left = true;
}
var conversation = ConversationController.create(attributes);
conversation.save();
conversation.save().then(ev.confirm);
}
function onMessageReceived(ev) {
var data = ev.data;
var message = initIncomingMessage(data.source, data.timestamp);
message.handleDataMessage(data.message);
var message = initIncomingMessage(data);
isMessageDuplicate(message).then(function(isDuplicate) {
if (isDuplicate) {
console.log('Received duplicate message', message.idForLogging());
ev.confirm();
return;
}
message.handleDataMessage(data.message, ev.confirm);
});
}
function onSentMessage(ev) {
@ -195,6 +205,7 @@
var message = new Whisper.Message({
source : textsecure.storage.user.getNumber(),
sourceDevice : data.device,
sent_at : data.timestamp,
received_at : now,
conversationId : data.destination,
@ -203,17 +214,53 @@
expirationStartTimestamp: data.expirationStartTimestamp,
});
message.handleDataMessage(data.message);
isMessageDuplicate(message).then(function(isDuplicate) {
if (isDuplicate) {
console.log('Received duplicate message', message.idForLogging());
ev.confirm();
return;
}
message.handleDataMessage(data.message, ev.confirm);
});
}
function initIncomingMessage(source, timestamp) {
function isMessageDuplicate(message) {
return new Promise(function(resolve) {
var fetcher = new Whisper.Message();
var options = {
index: {
name: 'unique',
value: [
message.get('source'),
message.get('sourceDevice'),
message.get('sent_at')
]
}
};
fetcher.fetch(options).always(function() {
if (fetcher.get('id')) {
return resolve(true);
}
return resolve(false);
});
}).catch(function(error) {
console.log('isMessageDuplicate error:', error && error.stack ? error.stack : error);
return false;
});
}
function initIncomingMessage(data) {
var now = new Date().getTime();
var message = new Whisper.Message({
source : source,
sent_at : timestamp,
source : data.source,
sourceDevice : data.sourceDevice,
sent_at : data.timestamp,
received_at : now,
conversationId : source,
conversationId : data.source,
type : 'incoming',
unread : 1
});
@ -284,11 +331,12 @@
var timestamp = ev.read.timestamp;
var sender = ev.read.sender;
console.log('read receipt ', sender, timestamp);
Whisper.ReadReceipts.add({
var receipt = Whisper.ReadReceipts.add({
sender : sender,
timestamp : timestamp,
read_at : read_at
});
receipt.on('remove', ev.confirm);
}
function onVerified(ev) {
@ -323,11 +371,11 @@
};
if (state === 'VERIFIED') {
contact.setVerified(options);
contact.setVerified(options).then(ev.confirm);
} else if (state === 'DEFAULT') {
contact.setVerifiedDefault(options);
contact.setVerifiedDefault(options).then(ev.confirm);
} else {
contact.setUnverified(options);
contact.setUnverified(options).then(ev.confirm);
}
}
@ -340,9 +388,11 @@
timestamp
);
Whisper.DeliveryReceipts.add({
timestamp: timestamp, source: pushMessage.source
var receipt = Whisper.DeliveryReceipts.add({
timestamp: timestamp,
source: pushMessage.source
});
receipt.on('remove', ev.confirm);
}
window.owsDesktopApp = {

View File

@ -254,6 +254,26 @@
console.log(event);
};
}
},
{
version: "14.0",
migrate: function(transaction, next) {
console.log('migration 14.0');
console.log('Adding unprocessed message store');
var unprocessed = transaction.db.createObjectStore('unprocessed');
unprocessed.createIndex('received', 'timestamp', { unique: false });
next();
}
},
{
version: "15.0",
migrate: function(transaction, next) {
console.log('migration 15.0');
console.log('Adding messages index for de-duplication');
var messages = transaction.objectStore('messages');
messages.createIndex('unique', ['source', 'sourceDevice', 'sent_at'], { unique: true });
next();
}
}
];
}());

View File

@ -40,7 +40,7 @@
}
});
var MAX_MESSAGES = 1000;
var MAX_MESSAGES = 3000;
var PHONE_REGEX = /\+\d{7,12}(\d{3})/g;
var log = new DebugLog();
if (window.console) {

View File

@ -43,7 +43,6 @@
});
}).then(function(message) {
if (message) {
this.remove(receipt);
var deliveries = message.get('delivered') || 0;
message.save({
delivered: deliveries + 1
@ -55,7 +54,9 @@
if (conversation) {
conversation.trigger('delivered', message);
}
});
this.remove(receipt);
}.bind(this));
// TODO: consider keeping a list of numbers we've
// successfully delivered to?
}

View File

@ -36906,9 +36906,9 @@ Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJ
* vim: ts=4:sw=4:expandtab
*/
'use strict';
;(function() {
'use strict';
/*********************
*** Group Storage ***
*********************/
@ -37051,6 +37051,35 @@ Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJ
};
})();
/*
* vim: ts=4:sw=4:expandtab
*/
;(function() {
'use strict';
/*****************************************
*** Not-yet-processed message storage ***
*****************************************/
window.textsecure = window.textsecure || {};
window.textsecure.storage = window.textsecure.storage || {};
window.textsecure.storage.unprocessed = {
getAll: function() {
return textsecure.storage.protocol.getAllUnprocessed();
},
add: function(data) {
return textsecure.storage.protocol.addUnprocessed(data);
},
update: function(id, updates) {
return textsecure.storage.protocol.updateUnprocessed(id, updates);
},
remove: function(id) {
return textsecure.storage.protocol.removeUnprocessed(id);
},
};
})();
;(function() {
'use strict';
window.textsecure = window.textsecure || {};
@ -38239,7 +38268,10 @@ MessageReceiver.prototype.extend({
handleRequest: this.handleRequest.bind(this),
keepalive: { path: '/v1/keepalive', disconnect: true }
});
this.pending = Promise.resolve();
this.queueAllCached();
},
close: function() {
this.socket.close(3000, 'called close');
@ -38280,25 +38312,146 @@ MessageReceiver.prototype.extend({
textsecure.crypto.decryptWebsocketMessage(request.body, this.signalingKey).then(function(plaintext) {
var envelope = textsecure.protobuf.Envelope.decode(plaintext);
// After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
request.respond(200, 'OK');
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
if (!this.isBlocked(envelope.source)) {
this.queueEnvelope(envelope);
if (this.isBlocked(envelope.source)) {
return request.respond(200, 'OK');
}
this.addToCache(envelope, plaintext).then(function() {
request.respond(200, 'OK');
this.queueEnvelope(envelope);
}.bind(this), function(error) {
console.log(
'handleRequest error trying to add message to cache:',
error && error.stack ? error.stack : error
);
});
}.bind(this)).catch(function(e) {
request.respond(500, 'Bad encrypted websocket message');
console.log("Error handling incoming message:", e);
console.log("Error handling incoming message:", e && e.stack ? e.stack : e);
var ev = new Event('error');
ev.error = e;
this.dispatchEvent(ev);
}.bind(this));
},
queueAllCached: function() {
this.getAllFromCache().then(function(items) {
for (var i = 0, max = items.length; i < max; i += 1) {
this.queueCached(items[i]);
}
}.bind(this));
},
queueCached: function(item) {
try {
var envelopePlaintext = this.stringToArrayBuffer(item.envelope);
var envelope = textsecure.protobuf.Envelope.decode(envelopePlaintext);
var decrypted = item.decrypted;
if (decrypted) {
var payloadPlaintext = this.stringToArrayBuffer(decrypted);
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
} else {
this.queueEnvelope(envelope);
}
}
catch (error) {
console.log('queueCached error handling item', item.id);
}
},
getEnvelopeId: function(envelope) {
return envelope.source + '.' + envelope.sourceDevice + ' ' + envelope.timestamp.toNumber();
},
arrayBufferToString: function(arrayBuffer) {
return new dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary');
},
stringToArrayBuffer: function(string) {
return new dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer();
},
getAllFromCache: function() {
console.log('getAllFromCache');
return textsecure.storage.unprocessed.getAll().then(function(items) {
console.log('getAllFromCache loaded', items.length, 'saved envelopes');
return Promise.all(_.map(items, function(item) {
var attempts = 1 + (item.attempts || 0);
if (attempts >= 5) {
console.log('getAllFromCache final attempt for envelope', item.id);
return textsecure.storage.unprocessed.remove(item.id);
} else {
return textsecure.storage.unprocessed.update(item.id, {attempts: attempts});
}
}.bind(this))).then(function() {
return items;
}, function(error) {
console.log(
'getAllFromCache error updating items after load:',
error && error.stack ? error.stack : error
);
return items;
});
}.bind(this));
},
addToCache: function(envelope, plaintext) {
var id = this.getEnvelopeId(envelope);
console.log('addToCache', id);
var string = this.arrayBufferToString(plaintext);
var data = {
id: id,
envelope: string,
timestamp: Date.now(),
attempts: 1
};
return textsecure.storage.unprocessed.add(data);
},
updateCache: function(envelope, plaintext) {
var id = this.getEnvelopeId(envelope);
console.log('updateCache', id);
var string = this.arrayBufferToString(plaintext);
var data = {
decrypted: string
};
return textsecure.storage.unprocessed.update(id, data);
},
removeFromCache: function(envelope) {
var id = this.getEnvelopeId(envelope);
console.log('removeFromCache', id);
return textsecure.storage.unprocessed.remove(id);
},
queueDecryptedEnvelope: function(envelope, plaintext) {
console.log('queueing decrypted envelope', this.getEnvelopeId(envelope));
var handleDecryptedEnvelope = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
this.pending = this.pending.then(handleDecryptedEnvelope, handleDecryptedEnvelope);
return this.pending.catch(function(error) {
console.log('queueDecryptedEnvelope error:', error && error.stack ? error.stack : error);
});
},
queueEnvelope: function(envelope) {
console.log('queueing envelope', this.getEnvelopeId(envelope));
var handleEnvelope = this.handleEnvelope.bind(this, envelope);
this.pending = this.pending.then(handleEnvelope, handleEnvelope);
return this.pending.catch(function(error) {
console.log('queueEnvelope error:', error && error.stack ? error.stack : error);
});
},
// Same as handleEnvelope, just without the decryption step. Necessary for handling
// messages which were successfully decrypted, but application logic didn't finish
// processing.
handleDecryptedEnvelope: function(envelope, plaintext) {
// No decryption is required for delivery receipts, so the decrypted field of
// the Unprocessed model will never be set
if (envelope.content) {
return this.innerHandleContentMessage(envelope, plaintext);
} else if (envelope.legacyMessage) {
return this.innerHandleLegacyMessage(envelope, plaintext);
} else {
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
}
},
handleEnvelope: function(envelope) {
if (envelope.type === textsecure.protobuf.Envelope.Type.RECEIPT) {
@ -38310,6 +38463,7 @@ MessageReceiver.prototype.extend({
} else if (envelope.legacyMessage) {
return this.handleLegacyMessage(envelope);
} else {
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
}
},
@ -38321,9 +38475,13 @@ MessageReceiver.prototype.extend({
}
},
onDeliveryReceipt: function (envelope) {
var ev = new Event('receipt');
ev.proto = envelope;
this.dispatchEvent(ev);
return new Promise(function(resolve) {
var ev = new Event('receipt');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.proto = envelope;
this.dispatchEvent(ev);
return resolve();
}.bind(this));
},
unpad: function(paddedPlaintext) {
paddedPlaintext = new Uint8Array(paddedPlaintext);
@ -38347,17 +38505,27 @@ MessageReceiver.prototype.extend({
var sessionCipher = new libsignal.SessionCipher(textsecure.storage.protocol, address);
switch(envelope.type) {
case textsecure.protobuf.Envelope.Type.CIPHERTEXT:
console.log('message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber());
console.log('message from', this.getEnvelopeId(envelope));
promise = sessionCipher.decryptWhisperMessage(ciphertext).then(this.unpad);
break;
case textsecure.protobuf.Envelope.Type.PREKEY_BUNDLE:
console.log('prekey message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber());
console.log('prekey message from', this.getEnvelopeId(envelope));
promise = this.decryptPreKeyWhisperMessage(ciphertext, sessionCipher, address);
break;
default:
promise = Promise.reject(new Error("Unknown message type"));
}
return promise.catch(function(error) {
return promise.then(function(plaintext) {
return this.updateCache(envelope, plaintext).then(function() {
return plaintext;
}, function(error) {
console.log(
'decrypt failed to save decrypted message contents to cache:',
error && error.stack ? error.stack : error
);
return plaintext;
});
}.bind(this)).catch(function(error) {
if (error.message === 'Unknown identity key') {
// create an error that the UI will pick up and ask the
// user if they want to re-negotiate
@ -38390,7 +38558,7 @@ MessageReceiver.prototype.extend({
throw e;
});
},
handleSentMessage: function(destination, timestamp, message, expirationStartTimestamp) {
handleSentMessage: function(envelope, destination, timestamp, message, expirationStartTimestamp) {
var p = Promise.resolve();
if ((message.flags & textsecure.protobuf.DataMessage.Flags.END_SESSION) ==
textsecure.protobuf.DataMessage.Flags.END_SESSION ) {
@ -38399,9 +38567,11 @@ MessageReceiver.prototype.extend({
return p.then(function() {
return this.processDecrypted(message, this.number).then(function(message) {
var ev = new Event('sent');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.data = {
destination : destination,
timestamp : timestamp.toNumber(),
device : envelope.sourceDevice,
message : message
};
if (expirationStartTimestamp) {
@ -38422,10 +38592,12 @@ MessageReceiver.prototype.extend({
return p.then(function() {
return this.processDecrypted(message, envelope.source).then(function(message) {
var ev = new Event('message');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.data = {
source : envelope.source,
timestamp : envelope.timestamp.toNumber(),
message : message
source : envelope.source,
sourceDevice : envelope.sourceDevice,
timestamp : envelope.timestamp.toNumber(),
message : message
};
this.dispatchEvent(ev);
}.bind(this));
@ -38433,27 +38605,35 @@ MessageReceiver.prototype.extend({
},
handleLegacyMessage: function (envelope) {
return this.decrypt(envelope, envelope.legacyMessage).then(function(plaintext) {
var message = textsecure.protobuf.DataMessage.decode(plaintext);
return this.handleDataMessage(envelope, message);
return this.innerHandleLegacyMessage(envelope, plaintext);
}.bind(this));
},
innerHandleLegacyMessage: function (envelope, plaintext) {
var message = textsecure.protobuf.DataMessage.decode(plaintext);
return this.handleDataMessage(envelope, message);
},
handleContentMessage: function (envelope) {
return this.decrypt(envelope, envelope.content).then(function(plaintext) {
var content = textsecure.protobuf.Content.decode(plaintext);
if (content.syncMessage) {
return this.handleSyncMessage(envelope, content.syncMessage);
} else if (content.dataMessage) {
return this.handleDataMessage(envelope, content.dataMessage);
} else if (content.nullMessage) {
return this.handleNullMessage(envelope, content.nullMessage);
} else {
throw new Error('Unsupported content message');
}
this.innerHandleContentMessage(envelope, plaintext);
}.bind(this));
},
innerHandleContentMessage: function(envelope, plaintext) {
var content = textsecure.protobuf.Content.decode(plaintext);
if (content.syncMessage) {
return this.handleSyncMessage(envelope, content.syncMessage);
} else if (content.dataMessage) {
return this.handleDataMessage(envelope, content.dataMessage);
} else if (content.nullMessage) {
return this.handleNullMessage(envelope, content.nullMessage);
} else {
this.removeFromCache(envelope);
throw new Error('Unsupported content message');
}
},
handleNullMessage: function(envelope, nullMessage) {
var encodedNumber = envelope.source + '.' + envelope.sourceDevice;
console.log('null message from', encodedNumber, envelope.timestamp.toNumber());
this.removeFromCache(envelope);
},
handleSyncMessage: function(envelope, syncMessage) {
if (envelope.source !== this.number) {
@ -38470,34 +38650,37 @@ MessageReceiver.prototype.extend({
'from', envelope.source + '.' + envelope.sourceDevice
);
return this.handleSentMessage(
envelope,
sentMessage.destination,
sentMessage.timestamp,
sentMessage.message,
sentMessage.expirationStartTimestamp
);
} else if (syncMessage.contacts) {
this.handleContacts(syncMessage.contacts);
this.handleContacts(envelope, syncMessage.contacts);
} else if (syncMessage.groups) {
this.handleGroups(syncMessage.groups);
this.handleGroups(envelope, syncMessage.groups);
} else if (syncMessage.blocked) {
this.handleBlocked(syncMessage.blocked);
this.handleBlocked(envelope, syncMessage.blocked);
} else if (syncMessage.request) {
console.log('Got SyncMessage Request');
this.removeFromCache(envelope);
} else if (syncMessage.read && syncMessage.read.length) {
console.log('read messages',
'from', envelope.source + '.' + envelope.sourceDevice);
this.handleRead(syncMessage.read, envelope.timestamp);
this.handleRead(envelope, syncMessage.read);
} else if (syncMessage.verified) {
this.handleVerified(syncMessage.verified);
this.handleVerified(envelope, syncMessage.verified);
} else {
throw new Error('Got empty SyncMessage');
}
},
handleVerified: function(verified, options) {
handleVerified: function(envelope, verified, options) {
options = options || {};
_.defaults(options, {viaContactSync: false});
var ev = new Event('verified');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.verified = {
state: verified.state,
destination: verified.destination,
@ -38506,10 +38689,11 @@ MessageReceiver.prototype.extend({
ev.viaContactSync = options.viaContactSync;
this.dispatchEvent(ev);
},
handleRead: function(read, timestamp) {
handleRead: function(envelope, read) {
for (var i = 0; i < read.length; ++i) {
var ev = new Event('read');
ev.timestamp = timestamp.toNumber();
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.timestamp = envelope.timestamp.toNumber();
ev.read = {
timestamp : read[i].timestamp.toNumber(),
sender : read[i].sender
@ -38517,7 +38701,7 @@ MessageReceiver.prototype.extend({
this.dispatchEvent(ev);
}
},
handleContacts: function(contacts) {
handleContacts: function(envelope, contacts) {
console.log('contact sync');
var eventTarget = this;
var attachmentPointer = contacts.blob;
@ -38526,19 +38710,23 @@ MessageReceiver.prototype.extend({
var contactDetails = contactBuffer.next();
while (contactDetails !== undefined) {
var ev = new Event('contact');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.contactDetails = contactDetails;
eventTarget.dispatchEvent(ev);
if (contactDetails.verified) {
this.handleVerified(contactDetails.verified, {viaContactSync: true});
this.handleVerified(envelope, contactDetails.verified, {viaContactSync: true});
}
contactDetails = contactBuffer.next();
}
eventTarget.dispatchEvent(new Event('contactsync'));
var ev = new Event('contactsync');
ev.confirm = this.removeFromCache.bind(this, envelope);
eventTarget.dispatchEvent(ev);
}.bind(this));
},
handleGroups: function(groups) {
handleGroups: function(envelope, groups) {
console.log('group sync');
var eventTarget = this;
var attachmentPointer = groups.blob;
@ -38567,18 +38755,22 @@ MessageReceiver.prototype.extend({
}
})(groupDetails).then(function(groupDetails) {
var ev = new Event('group');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.groupDetails = groupDetails;
eventTarget.dispatchEvent(ev);
}).catch(function(e) {
}.bind(this)).catch(function(e) {
console.log('error processing group', e);
});
groupDetails = groupBuffer.next();
promises.push(promise);
}
Promise.all(promises).then(function() {
eventTarget.dispatchEvent(new Event('groupsync'));
});
});
var ev = new Event('groupsync');
ev.confirm = this.removeFromCache.bind(this, envelope);
eventTarget.dispatchEvent(ev);
}.bind(this));
}.bind(this));
},
handleBlocked: function(blocked) {
textsecure.storage.put('blocked', blocked.numbers);

View File

@ -15,7 +15,10 @@
this.on('change:expireTimer', this.setToExpire);
this.setToExpire();
},
defaults : function() {
idForLogging: function() {
return this.get('source') + '.' + this.get('sourceDevice') + ' ' + this.get('sent_at');
},
defaults: function() {
return {
timestamp: new Date().getTime(),
attachments: []
@ -339,7 +342,7 @@
this.send(promise);
}
},
handleDataMessage: function(dataMessage) {
handleDataMessage: function(dataMessage, confirm) {
// This function can be called from the background script on an
// incoming message or from the frontend after the user accepts an
// identity key change.
@ -351,13 +354,13 @@
if (dataMessage.group) {
conversationId = dataMessage.group.id;
}
console.log('queuing handleDataMessage', source, timestamp);
console.log('queuing handleDataMessage', message.idForLogging());
var conversation = ConversationController.create({id: conversationId});
conversation.queueJob(function() {
return new Promise(function(resolve) {
conversation.fetch().always(function() {
console.log('starting handleDataMessage', source, timestamp);
console.log('starting handleDataMessage', message.idForLogging());
var now = new Date().getTime();
var attributes = { type: 'private' };
@ -468,15 +471,18 @@
});
}
console.log('beginning saves in handleDataMessage', source, timestamp);
console.log('beginning saves in handleDataMessage', message.idForLogging());
var handleError = function(error) {
error = error && error.stack ? error.stack : error;
console.log('handleDataMessage', source, timestamp, 'error:', error);
console.log('handleDataMessage', message.idForLogging(), 'error:', error);
return resolve();
};
message.save().then(function() {
// throw new Error('Something went wrong!');
conversation.save().then(function() {
try {
conversation.trigger('newmessage', message);
@ -501,7 +507,11 @@
conversation.notify(message);
}
console.log('done with handleDataMessage', source, timestamp);
console.log('done with handleDataMessage', message.idForLogging());
if (confirm) {
confirm();
}
return resolve();
}
catch (e) {

View File

@ -27,9 +27,9 @@
message.get('source') === receipt.get('sender'));
});
if (message) {
this.remove(receipt);
message.markRead(receipt.get('read_at')).then(function() {
this.notifyConversation(message);
this.remove(receipt);
}.bind(this));
} else {
console.log('No message for read receipt');

View File

@ -104,6 +104,13 @@
return this.fetch({range: [number + '.1', number + '.' + ':']});
}
});
var Unprocessed = Model.extend({ storeName : 'unprocessed' });
var UnprocessedCollection = Backbone.Collection.extend({
storeName : 'unprocessed',
database : Whisper.Database,
model : Unprocessed,
comparator : 'timestamp'
});
var IdentityRecord = Model.extend({
storeName: 'identityKeys',
validAttributes: [
@ -740,6 +747,8 @@
resolve(textsecure.storage.protocol.removeAllSessions(number));
});
},
// Groups
getGroup: function(groupId) {
if (groupId === null || groupId === undefined) {
throw new Error("Tried to get group for undefined/null id");
@ -773,6 +782,41 @@
});
},
// Not yet processed messages - for resiliency
getAllUnprocessed: function() {
var collection;
return new Promise(function(resolve, reject) {
collection = new UnprocessedCollection();
return collection.fetch().then(resolve, reject);
}).then(function() {
// Return a plain array of plain objects
return collection.map('attributes');
});
},
addUnprocessed: function(data) {
return new Promise(function(resolve, reject) {
var unprocessed = new Unprocessed(data);
return unprocessed.save().then(resolve, reject);
});
},
updateUnprocessed: function(id, updates) {
return new Promise(function(resolve, reject) {
var unprocessed = new Unprocessed({
id: id
});
return unprocessed.fetch().then(function() {
return unprocessed.save(updates).then(resolve, reject);
}, reject);
}.bind(this));
},
removeUnprocessed: function(id) {
return new Promise(function(resolve, reject) {
var unprocessed = new Unprocessed({
id: id
});
return unprocessed.destroy().then(resolve, reject);
}.bind(this));
},
};
_.extend(SignalProtocolStore.prototype, Backbone.Events);

View File

@ -30,7 +30,10 @@ MessageReceiver.prototype.extend({
handleRequest: this.handleRequest.bind(this),
keepalive: { path: '/v1/keepalive', disconnect: true }
});
this.pending = Promise.resolve();
this.queueAllCached();
},
close: function() {
this.socket.close(3000, 'called close');
@ -71,25 +74,146 @@ MessageReceiver.prototype.extend({
textsecure.crypto.decryptWebsocketMessage(request.body, this.signalingKey).then(function(plaintext) {
var envelope = textsecure.protobuf.Envelope.decode(plaintext);
// After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
request.respond(200, 'OK');
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
if (!this.isBlocked(envelope.source)) {
this.queueEnvelope(envelope);
if (this.isBlocked(envelope.source)) {
return request.respond(200, 'OK');
}
this.addToCache(envelope, plaintext).then(function() {
request.respond(200, 'OK');
this.queueEnvelope(envelope);
}.bind(this), function(error) {
console.log(
'handleRequest error trying to add message to cache:',
error && error.stack ? error.stack : error
);
});
}.bind(this)).catch(function(e) {
request.respond(500, 'Bad encrypted websocket message');
console.log("Error handling incoming message:", e);
console.log("Error handling incoming message:", e && e.stack ? e.stack : e);
var ev = new Event('error');
ev.error = e;
this.dispatchEvent(ev);
}.bind(this));
},
queueAllCached: function() {
this.getAllFromCache().then(function(items) {
for (var i = 0, max = items.length; i < max; i += 1) {
this.queueCached(items[i]);
}
}.bind(this));
},
queueCached: function(item) {
try {
var envelopePlaintext = this.stringToArrayBuffer(item.envelope);
var envelope = textsecure.protobuf.Envelope.decode(envelopePlaintext);
var decrypted = item.decrypted;
if (decrypted) {
var payloadPlaintext = this.stringToArrayBuffer(decrypted);
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
} else {
this.queueEnvelope(envelope);
}
}
catch (error) {
console.log('queueCached error handling item', item.id);
}
},
getEnvelopeId: function(envelope) {
return envelope.source + '.' + envelope.sourceDevice + ' ' + envelope.timestamp.toNumber();
},
arrayBufferToString: function(arrayBuffer) {
return new dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary');
},
stringToArrayBuffer: function(string) {
return new dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer();
},
getAllFromCache: function() {
console.log('getAllFromCache');
return textsecure.storage.unprocessed.getAll().then(function(items) {
console.log('getAllFromCache loaded', items.length, 'saved envelopes');
return Promise.all(_.map(items, function(item) {
var attempts = 1 + (item.attempts || 0);
if (attempts >= 5) {
console.log('getAllFromCache final attempt for envelope', item.id);
return textsecure.storage.unprocessed.remove(item.id);
} else {
return textsecure.storage.unprocessed.update(item.id, {attempts: attempts});
}
}.bind(this))).then(function() {
return items;
}, function(error) {
console.log(
'getAllFromCache error updating items after load:',
error && error.stack ? error.stack : error
);
return items;
});
}.bind(this));
},
addToCache: function(envelope, plaintext) {
var id = this.getEnvelopeId(envelope);
console.log('addToCache', id);
var string = this.arrayBufferToString(plaintext);
var data = {
id: id,
envelope: string,
timestamp: Date.now(),
attempts: 1
};
return textsecure.storage.unprocessed.add(data);
},
updateCache: function(envelope, plaintext) {
var id = this.getEnvelopeId(envelope);
console.log('updateCache', id);
var string = this.arrayBufferToString(plaintext);
var data = {
decrypted: string
};
return textsecure.storage.unprocessed.update(id, data);
},
removeFromCache: function(envelope) {
var id = this.getEnvelopeId(envelope);
console.log('removeFromCache', id);
return textsecure.storage.unprocessed.remove(id);
},
queueDecryptedEnvelope: function(envelope, plaintext) {
console.log('queueing decrypted envelope', this.getEnvelopeId(envelope));
var handleDecryptedEnvelope = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
this.pending = this.pending.then(handleDecryptedEnvelope, handleDecryptedEnvelope);
return this.pending.catch(function(error) {
console.log('queueDecryptedEnvelope error:', error && error.stack ? error.stack : error);
});
},
queueEnvelope: function(envelope) {
console.log('queueing envelope', this.getEnvelopeId(envelope));
var handleEnvelope = this.handleEnvelope.bind(this, envelope);
this.pending = this.pending.then(handleEnvelope, handleEnvelope);
return this.pending.catch(function(error) {
console.log('queueEnvelope error:', error && error.stack ? error.stack : error);
});
},
// Same as handleEnvelope, just without the decryption step. Necessary for handling
// messages which were successfully decrypted, but application logic didn't finish
// processing.
handleDecryptedEnvelope: function(envelope, plaintext) {
// No decryption is required for delivery receipts, so the decrypted field of
// the Unprocessed model will never be set
if (envelope.content) {
return this.innerHandleContentMessage(envelope, plaintext);
} else if (envelope.legacyMessage) {
return this.innerHandleLegacyMessage(envelope, plaintext);
} else {
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
}
},
handleEnvelope: function(envelope) {
if (envelope.type === textsecure.protobuf.Envelope.Type.RECEIPT) {
@ -101,6 +225,7 @@ MessageReceiver.prototype.extend({
} else if (envelope.legacyMessage) {
return this.handleLegacyMessage(envelope);
} else {
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
}
},
@ -112,9 +237,13 @@ MessageReceiver.prototype.extend({
}
},
onDeliveryReceipt: function (envelope) {
var ev = new Event('receipt');
ev.proto = envelope;
this.dispatchEvent(ev);
return new Promise(function(resolve) {
var ev = new Event('receipt');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.proto = envelope;
this.dispatchEvent(ev);
return resolve();
}.bind(this));
},
unpad: function(paddedPlaintext) {
paddedPlaintext = new Uint8Array(paddedPlaintext);
@ -138,17 +267,27 @@ MessageReceiver.prototype.extend({
var sessionCipher = new libsignal.SessionCipher(textsecure.storage.protocol, address);
switch(envelope.type) {
case textsecure.protobuf.Envelope.Type.CIPHERTEXT:
console.log('message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber());
console.log('message from', this.getEnvelopeId(envelope));
promise = sessionCipher.decryptWhisperMessage(ciphertext).then(this.unpad);
break;
case textsecure.protobuf.Envelope.Type.PREKEY_BUNDLE:
console.log('prekey message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber());
console.log('prekey message from', this.getEnvelopeId(envelope));
promise = this.decryptPreKeyWhisperMessage(ciphertext, sessionCipher, address);
break;
default:
promise = Promise.reject(new Error("Unknown message type"));
}
return promise.catch(function(error) {
return promise.then(function(plaintext) {
return this.updateCache(envelope, plaintext).then(function() {
return plaintext;
}, function(error) {
console.log(
'decrypt failed to save decrypted message contents to cache:',
error && error.stack ? error.stack : error
);
return plaintext;
});
}.bind(this)).catch(function(error) {
if (error.message === 'Unknown identity key') {
// create an error that the UI will pick up and ask the
// user if they want to re-negotiate
@ -181,7 +320,7 @@ MessageReceiver.prototype.extend({
throw e;
});
},
handleSentMessage: function(destination, timestamp, message, expirationStartTimestamp) {
handleSentMessage: function(envelope, destination, timestamp, message, expirationStartTimestamp) {
var p = Promise.resolve();
if ((message.flags & textsecure.protobuf.DataMessage.Flags.END_SESSION) ==
textsecure.protobuf.DataMessage.Flags.END_SESSION ) {
@ -190,9 +329,11 @@ MessageReceiver.prototype.extend({
return p.then(function() {
return this.processDecrypted(message, this.number).then(function(message) {
var ev = new Event('sent');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.data = {
destination : destination,
timestamp : timestamp.toNumber(),
device : envelope.sourceDevice,
message : message
};
if (expirationStartTimestamp) {
@ -213,10 +354,12 @@ MessageReceiver.prototype.extend({
return p.then(function() {
return this.processDecrypted(message, envelope.source).then(function(message) {
var ev = new Event('message');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.data = {
source : envelope.source,
timestamp : envelope.timestamp.toNumber(),
message : message
source : envelope.source,
sourceDevice : envelope.sourceDevice,
timestamp : envelope.timestamp.toNumber(),
message : message
};
this.dispatchEvent(ev);
}.bind(this));
@ -224,27 +367,35 @@ MessageReceiver.prototype.extend({
},
handleLegacyMessage: function (envelope) {
return this.decrypt(envelope, envelope.legacyMessage).then(function(plaintext) {
var message = textsecure.protobuf.DataMessage.decode(plaintext);
return this.handleDataMessage(envelope, message);
return this.innerHandleLegacyMessage(envelope, plaintext);
}.bind(this));
},
innerHandleLegacyMessage: function (envelope, plaintext) {
var message = textsecure.protobuf.DataMessage.decode(plaintext);
return this.handleDataMessage(envelope, message);
},
handleContentMessage: function (envelope) {
return this.decrypt(envelope, envelope.content).then(function(plaintext) {
var content = textsecure.protobuf.Content.decode(plaintext);
if (content.syncMessage) {
return this.handleSyncMessage(envelope, content.syncMessage);
} else if (content.dataMessage) {
return this.handleDataMessage(envelope, content.dataMessage);
} else if (content.nullMessage) {
return this.handleNullMessage(envelope, content.nullMessage);
} else {
throw new Error('Unsupported content message');
}
this.innerHandleContentMessage(envelope, plaintext);
}.bind(this));
},
innerHandleContentMessage: function(envelope, plaintext) {
var content = textsecure.protobuf.Content.decode(plaintext);
if (content.syncMessage) {
return this.handleSyncMessage(envelope, content.syncMessage);
} else if (content.dataMessage) {
return this.handleDataMessage(envelope, content.dataMessage);
} else if (content.nullMessage) {
return this.handleNullMessage(envelope, content.nullMessage);
} else {
this.removeFromCache(envelope);
throw new Error('Unsupported content message');
}
},
handleNullMessage: function(envelope, nullMessage) {
var encodedNumber = envelope.source + '.' + envelope.sourceDevice;
console.log('null message from', encodedNumber, envelope.timestamp.toNumber());
this.removeFromCache(envelope);
},
handleSyncMessage: function(envelope, syncMessage) {
if (envelope.source !== this.number) {
@ -261,34 +412,37 @@ MessageReceiver.prototype.extend({
'from', envelope.source + '.' + envelope.sourceDevice
);
return this.handleSentMessage(
envelope,
sentMessage.destination,
sentMessage.timestamp,
sentMessage.message,
sentMessage.expirationStartTimestamp
);
} else if (syncMessage.contacts) {
this.handleContacts(syncMessage.contacts);
this.handleContacts(envelope, syncMessage.contacts);
} else if (syncMessage.groups) {
this.handleGroups(syncMessage.groups);
this.handleGroups(envelope, syncMessage.groups);
} else if (syncMessage.blocked) {
this.handleBlocked(syncMessage.blocked);
this.handleBlocked(envelope, syncMessage.blocked);
} else if (syncMessage.request) {
console.log('Got SyncMessage Request');
this.removeFromCache(envelope);
} else if (syncMessage.read && syncMessage.read.length) {
console.log('read messages',
'from', envelope.source + '.' + envelope.sourceDevice);
this.handleRead(syncMessage.read, envelope.timestamp);
this.handleRead(envelope, syncMessage.read);
} else if (syncMessage.verified) {
this.handleVerified(syncMessage.verified);
this.handleVerified(envelope, syncMessage.verified);
} else {
throw new Error('Got empty SyncMessage');
}
},
handleVerified: function(verified, options) {
handleVerified: function(envelope, verified, options) {
options = options || {};
_.defaults(options, {viaContactSync: false});
var ev = new Event('verified');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.verified = {
state: verified.state,
destination: verified.destination,
@ -297,10 +451,11 @@ MessageReceiver.prototype.extend({
ev.viaContactSync = options.viaContactSync;
this.dispatchEvent(ev);
},
handleRead: function(read, timestamp) {
handleRead: function(envelope, read) {
for (var i = 0; i < read.length; ++i) {
var ev = new Event('read');
ev.timestamp = timestamp.toNumber();
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.timestamp = envelope.timestamp.toNumber();
ev.read = {
timestamp : read[i].timestamp.toNumber(),
sender : read[i].sender
@ -308,7 +463,7 @@ MessageReceiver.prototype.extend({
this.dispatchEvent(ev);
}
},
handleContacts: function(contacts) {
handleContacts: function(envelope, contacts) {
console.log('contact sync');
var eventTarget = this;
var attachmentPointer = contacts.blob;
@ -317,19 +472,23 @@ MessageReceiver.prototype.extend({
var contactDetails = contactBuffer.next();
while (contactDetails !== undefined) {
var ev = new Event('contact');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.contactDetails = contactDetails;
eventTarget.dispatchEvent(ev);
if (contactDetails.verified) {
this.handleVerified(contactDetails.verified, {viaContactSync: true});
this.handleVerified(envelope, contactDetails.verified, {viaContactSync: true});
}
contactDetails = contactBuffer.next();
}
eventTarget.dispatchEvent(new Event('contactsync'));
var ev = new Event('contactsync');
ev.confirm = this.removeFromCache.bind(this, envelope);
eventTarget.dispatchEvent(ev);
}.bind(this));
},
handleGroups: function(groups) {
handleGroups: function(envelope, groups) {
console.log('group sync');
var eventTarget = this;
var attachmentPointer = groups.blob;
@ -358,18 +517,22 @@ MessageReceiver.prototype.extend({
}
})(groupDetails).then(function(groupDetails) {
var ev = new Event('group');
ev.confirm = this.removeFromCache.bind(this, envelope);
ev.groupDetails = groupDetails;
eventTarget.dispatchEvent(ev);
}).catch(function(e) {
}.bind(this)).catch(function(e) {
console.log('error processing group', e);
});
groupDetails = groupBuffer.next();
promises.push(promise);
}
Promise.all(promises).then(function() {
eventTarget.dispatchEvent(new Event('groupsync'));
});
});
var ev = new Event('groupsync');
ev.confirm = this.removeFromCache.bind(this, envelope);
eventTarget.dispatchEvent(ev);
}.bind(this));
}.bind(this));
},
handleBlocked: function(blocked) {
textsecure.storage.put('blocked', blocked.numbers);

View File

@ -2,9 +2,9 @@
* vim: ts=4:sw=4:expandtab
*/
'use strict';
;(function() {
'use strict';
/*********************
*** Group Storage ***
*********************/

View File

@ -0,0 +1,28 @@
/*
* vim: ts=4:sw=4:expandtab
*/
;(function() {
'use strict';
/*****************************************
*** Not-yet-processed message storage ***
*****************************************/
window.textsecure = window.textsecure || {};
window.textsecure.storage = window.textsecure.storage || {};
window.textsecure.storage.unprocessed = {
getAll: function() {
return textsecure.storage.protocol.getAllUnprocessed();
},
add: function(data) {
return textsecure.storage.protocol.addUnprocessed(data);
},
update: function(id, updates) {
return textsecure.storage.protocol.updateUnprocessed(id, updates);
},
remove: function(id) {
return textsecure.storage.protocol.removeUnprocessed(id);
},
};
})();

View File

@ -874,4 +874,60 @@ describe("SignalProtocolStore", function() {
}).then(done,done);
});
});
describe('Not yet processed messages', function() {
beforeEach(function() {
return store.getAllUnprocessed().then(function(items) {
return Promise.all(_.map(items, function(item) {
return store.removeUnprocessed(item.id);
}));
}).then(function() {
return store.getAllUnprocessed();
}).then(function(items) {
assert.strictEqual(items.length, 0);
});
});
it('adds two and gets them back', function() {
return Promise.all([
store.addUnprocessed({id: 2, name: 'second', timestamp: 2}),
store.addUnprocessed({id: 3, name: 'third', timestamp: 3}),
store.addUnprocessed({id: 1, name: 'first', timestamp: 1})
]).then(function() {
return store.getAllUnprocessed();
}).then(function(items) {
assert.strictEqual(items.length, 3);
// they are in the proper order because the collection comparator is 'timestamp'
assert.strictEqual(items[0].name, 'first');
assert.strictEqual(items[1].name, 'second');
assert.strictEqual(items[2].name, 'third');
});
});
it('updateUnprocessed successfully updates only part of itme', function() {
var id = 1;
return store.addUnprocessed({id: id, name: 'first', timestamp: 1}).then(function() {
return store.updateUnprocessed(id, {name: 'updated'});
}).then(function() {
return store.getAllUnprocessed();
}).then(function(items) {
assert.strictEqual(items.length, 1);
assert.strictEqual(items[0].name, 'updated');
assert.strictEqual(items[0].timestamp, 1);
});
});
it('removeUnprocessed successfully deletes item', function() {
var id = 1;
return store.addUnprocessed({id: id, name: 'first', timestamp: 1}).then(function() {
return store.removeUnprocessed(id);
}).then(function() {
return store.getAllUnprocessed();
}).then(function(items) {
assert.strictEqual(items.length, 0);
});
});
});
});