From 3732ffbea5e1ab0a235409fe88a942a8f5221879 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 14 Jul 2017 11:14:06 +0430 Subject: [PATCH 1/4] publish all client subs at once --- index.js | 57 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/index.js b/index.js index 65880e9..5640f78 100644 --- a/index.js +++ b/index.js @@ -37,19 +37,23 @@ function CachedPersistence (opts) { this._onMessage = function onSubMessage (packet, cb) { var decoded = JSON.parse(packet.payload) - if (packet.topic === newSubTopic) { - if (!checkSubsForClient(decoded, that._matcher.match(decoded.topic))) { - that._matcher.add(decoded.topic, decoded) + var clientId + for (var i = 0; i < decoded.subs.length; i++) { + var sub = decoded.subs[i] + clientId = sub.clientId + if (packet.topic === newSubTopic) { + if (!checkSubsForClient(sub, that._matcher.match(sub.topic))) { + that._matcher.add(sub.topic, sub) + } + } else if (packet.topic === rmSubTopic) { + that._matcher + .match(sub.topic) + .filter(matching, sub) + .forEach(rmSub, that._matcher) } - } else if (packet.topic === rmSubTopic) { - that._matcher - .match(decoded.topic) - .filter(matching, decoded) - .forEach(rmSub, that._matcher) } - var key = decoded.clientId + '-' + decoded.topic - var waiting = that._waiting[key] - delete that._waiting[key] + var waiting = that._waiting[clientId] + delete that._waiting[clientId] if (waiting) { process.nextTick(waiting) } @@ -65,16 +69,10 @@ function rmSub (sub) { this.remove(sub.topic, sub) } -function Sub (clientId, topic, qos) { - this.clientId = clientId - this.topic = topic - this.qos = qos -} - inherits(CachedPersistence, EE) -CachedPersistence.prototype._waitFor = function (client, topic, cb) { - this._waiting[client.id + '-' + topic] = cb +CachedPersistence.prototype._waitFor = function (client, cb) { + this._waiting[client.id] = cb } CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { @@ -85,21 +83,22 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { subs = subs.filter(qosGreaterThanOne) - this._parallel({ + var ctx = { cb: cb || noop, client: client, broker: this._broker, - topic: newSubTopic - }, brokerPublish, subs, addedSubDone) + topic: newSubTopic, + brokerPublish: brokerPublish + } + ctx.brokerPublish(subs, addedSubDone.bind(ctx)) } function qosGreaterThanOne (sub) { return sub.qos > 0 } -function brokerPublish (sub, cb) { - var client = this.client - var encoded = JSON.stringify(new Sub(client.id, sub.topic, sub.qos)) +function brokerPublish (subs, cb) { + var encoded = JSON.stringify({subs: subs}) var packet = new Packet({ topic: this.topic, payload: encoded @@ -114,12 +113,14 @@ function addedSubDone () { function noop () {} CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) { - this._parallel({ + var ctx = { cb: cb || noop, client: client, broker: this._broker, - topic: rmSubTopic - }, brokerPublish, subs, addedSubDone) + topic: rmSubTopic, + brokerPublish: brokerPublish + } + ctx.brokerPublish(subs, addedSubDone.bind(ctx)) } CachedPersistence.prototype.subscriptionsByTopic = function (topic, cb) { From fc27c3ce9bdbc8e8a4adf8b2beb8e2e40dd8e8bd Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 14 Jul 2017 11:15:48 +0430 Subject: [PATCH 2/4] removed parallel --- index.js | 2 -- package.json | 1 - 2 files changed, 3 deletions(-) diff --git a/index.js b/index.js index 5640f78..19a36ea 100644 --- a/index.js +++ b/index.js @@ -4,7 +4,6 @@ var Qlobber = require('qlobber').Qlobber var Packet = require('aedes-packet') var EE = require('events').EventEmitter var inherits = require('util').inherits -var fastparallel = require('fastparallel') var MultiStream = require('multistream') var QlobberOpts = { @@ -27,7 +26,6 @@ function CachedPersistence (opts) { this.destroyed = false this._matcher = new Qlobber(QlobberOpts) this._waiting = {} - this._parallel = fastparallel({ results: false }) var that = this diff --git a/package.json b/package.json index e8f5898..be472f6 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,6 @@ "dependencies": { "aedes-packet": "^1.0.0", "aedes-persistence": "^4.0.0", - "fastparallel": "^2.2.1", "multistream": "^2.1.0", "qlobber": "^0.8.0" } From e4592314f005edecab0b0d6231815a93698f8f0f Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 14 Jul 2017 16:09:33 +0430 Subject: [PATCH 3/4] replace bind with closure --- index.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index 19a36ea..3c33f89 100644 --- a/index.js +++ b/index.js @@ -88,7 +88,9 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { topic: newSubTopic, brokerPublish: brokerPublish } - ctx.brokerPublish(subs, addedSubDone.bind(ctx)) + ctx.brokerPublish(subs, function () { + cb(null, client) + }) } function qosGreaterThanOne (sub) { @@ -104,10 +106,6 @@ function brokerPublish (subs, cb) { this.broker.publish(packet, cb) } -function addedSubDone () { - this.cb(null, this.client) -} - function noop () {} CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) { @@ -118,7 +116,9 @@ CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) topic: rmSubTopic, brokerPublish: brokerPublish } - ctx.brokerPublish(subs, addedSubDone.bind(ctx)) + ctx.brokerPublish(subs, function () { + cb(null, client) + }) } CachedPersistence.prototype.subscriptionsByTopic = function (topic, cb) { From e0bdad67f79fd2fb204f3ef34d55aa018ae126f7 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 14 Jul 2017 22:47:53 +0430 Subject: [PATCH 4/4] bring _waitFor inside cached-persistence --- index.js | 52 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/index.js b/index.js index 3c33f89..330d21d 100644 --- a/index.js +++ b/index.js @@ -50,8 +50,9 @@ function CachedPersistence (opts) { .forEach(rmSub, that._matcher) } } - var waiting = that._waiting[clientId] - delete that._waiting[clientId] + var action = packet.topic === newSubTopic ? 'sub' : 'unsub' + var waiting = that._waiting[clientId + '-' + action] + delete that._waiting[clientId + '-' + action] if (waiting) { process.nextTick(waiting) } @@ -69,8 +70,8 @@ function rmSub (sub) { inherits(CachedPersistence, EE) -CachedPersistence.prototype._waitFor = function (client, cb) { - this._waiting[client.id] = cb +CachedPersistence.prototype._waitFor = function (client, action, cb) { + this._waiting[client.id + '-' + action] = cb } CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { @@ -79,7 +80,21 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { return } + var errored = false + + this._waitFor(client, 'sub', function (err) { + if (!errored && err) { + return cb(err) + } + if (!errored) { + cb(null, client) + } + }) + subs = subs.filter(qosGreaterThanOne) + if (subs.length === 0) { + return cb(null, client) + } var ctx = { cb: cb || noop, @@ -88,8 +103,11 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { topic: newSubTopic, brokerPublish: brokerPublish } - ctx.brokerPublish(subs, function () { - cb(null, client) + ctx.brokerPublish(subs, function (err) { + if (err) { + errored = true + cb(err) + } }) } @@ -109,6 +127,21 @@ function brokerPublish (subs, cb) { function noop () {} CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) { + if (!this.ready) { + this.once('ready', this._removedSubscriptions.bind(this, client, subs, cb)) + return + } + var errored = false + + this._waitFor(client, 'unsub', function (err) { + if (!errored && err) { + return cb(err) + } + if (!errored) { + cb(null, client) + } + }) + var ctx = { cb: cb || noop, client: client, @@ -116,8 +149,11 @@ CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) topic: rmSubTopic, brokerPublish: brokerPublish } - ctx.brokerPublish(subs, function () { - cb(null, client) + ctx.brokerPublish(subs, function (err) { + if (err) { + errored = true + cb(err) + } }) }