From e0bdad67f79fd2fb204f3ef34d55aa018ae126f7 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 14 Jul 2017 22:47:53 +0430 Subject: [PATCH] bring _waitFor inside cached-persistence --- index.js | 52 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/index.js b/index.js index 3c33f89..330d21d 100644 --- a/index.js +++ b/index.js @@ -50,8 +50,9 @@ function CachedPersistence (opts) { .forEach(rmSub, that._matcher) } } - var waiting = that._waiting[clientId] - delete that._waiting[clientId] + var action = packet.topic === newSubTopic ? 'sub' : 'unsub' + var waiting = that._waiting[clientId + '-' + action] + delete that._waiting[clientId + '-' + action] if (waiting) { process.nextTick(waiting) } @@ -69,8 +70,8 @@ function rmSub (sub) { inherits(CachedPersistence, EE) -CachedPersistence.prototype._waitFor = function (client, cb) { - this._waiting[client.id] = cb +CachedPersistence.prototype._waitFor = function (client, action, cb) { + this._waiting[client.id + '-' + action] = cb } CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { @@ -79,7 +80,21 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { return } + var errored = false + + this._waitFor(client, 'sub', function (err) { + if (!errored && err) { + return cb(err) + } + if (!errored) { + cb(null, client) + } + }) + subs = subs.filter(qosGreaterThanOne) + if (subs.length === 0) { + return cb(null, client) + } var ctx = { cb: cb || noop, @@ -88,8 +103,11 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { topic: newSubTopic, brokerPublish: brokerPublish } - ctx.brokerPublish(subs, function () { - cb(null, client) + ctx.brokerPublish(subs, function (err) { + if (err) { + errored = true + cb(err) + } }) } @@ -109,6 +127,21 @@ function brokerPublish (subs, cb) { function noop () {} CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) { + if (!this.ready) { + this.once('ready', this._removedSubscriptions.bind(this, client, subs, cb)) + return + } + var errored = false + + this._waitFor(client, 'unsub', function (err) { + if (!errored && err) { + return cb(err) + } + if (!errored) { + cb(null, client) + } + }) + var ctx = { cb: cb || noop, client: client, @@ -116,8 +149,11 @@ CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) topic: rmSubTopic, brokerPublish: brokerPublish } - ctx.brokerPublish(subs, function () { - cb(null, client) + ctx.brokerPublish(subs, function (err) { + if (err) { + errored = true + cb(err) + } }) }