Skip to content

Commit

Permalink
bring _waitFor inside cached-persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
behrad committed Jul 14, 2017
1 parent e459231 commit e0bdad6
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ function CachedPersistence (opts) {
.forEach(rmSub, that._matcher)
}
}
var waiting = that._waiting[clientId]
delete that._waiting[clientId]
var action = packet.topic === newSubTopic ? 'sub' : 'unsub'
var waiting = that._waiting[clientId + '-' + action]
delete that._waiting[clientId + '-' + action]
if (waiting) {
process.nextTick(waiting)
}
Expand All @@ -69,8 +70,8 @@ function rmSub (sub) {

inherits(CachedPersistence, EE)

CachedPersistence.prototype._waitFor = function (client, cb) {
this._waiting[client.id] = cb
CachedPersistence.prototype._waitFor = function (client, action, cb) {
this._waiting[client.id + '-' + action] = cb
}

CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) {
Expand All @@ -79,7 +80,21 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) {
return
}

var errored = false

this._waitFor(client, 'sub', function (err) {
if (!errored && err) {
return cb(err)
}
if (!errored) {
cb(null, client)
}
})

subs = subs.filter(qosGreaterThanOne)
if (subs.length === 0) {
return cb(null, client)
}

var ctx = {
cb: cb || noop,
Expand All @@ -88,8 +103,11 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) {
topic: newSubTopic,
brokerPublish: brokerPublish
}
ctx.brokerPublish(subs, function () {
cb(null, client)
ctx.brokerPublish(subs, function (err) {
if (err) {
errored = true
cb(err)
}
})
}

Expand All @@ -109,15 +127,33 @@ function brokerPublish (subs, cb) {
function noop () {}

CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) {
if (!this.ready) {
this.once('ready', this._removedSubscriptions.bind(this, client, subs, cb))
return
}
var errored = false

this._waitFor(client, 'unsub', function (err) {
if (!errored && err) {
return cb(err)
}
if (!errored) {
cb(null, client)
}
})

var ctx = {
cb: cb || noop,
client: client,
broker: this._broker,
topic: rmSubTopic,
brokerPublish: brokerPublish
}
ctx.brokerPublish(subs, function () {
cb(null, client)
ctx.brokerPublish(subs, function (err) {
if (err) {
errored = true
cb(err)
}
})
}

Expand Down

0 comments on commit e0bdad6

Please sign in to comment.