Skip to content

Commit

Permalink
Merge pull request #7 from behrad/publish_clientSub
Browse files Browse the repository at this point in the history
Publish client sub
  • Loading branch information
mcollina authored Jul 17, 2017
2 parents f0525c8 + e0bdad6 commit 9773d4e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 35 deletions.
103 changes: 69 additions & 34 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ var Qlobber = require('qlobber').Qlobber
var Packet = require('aedes-packet')
var EE = require('events').EventEmitter
var inherits = require('util').inherits
var fastparallel = require('fastparallel')
var MultiStream = require('multistream')

var QlobberOpts = {
Expand All @@ -27,7 +26,6 @@ function CachedPersistence (opts) {
this.destroyed = false
this._matcher = new Qlobber(QlobberOpts)
this._waiting = {}
this._parallel = fastparallel({ results: false })

var that = this

Expand All @@ -37,19 +35,24 @@ function CachedPersistence (opts) {

this._onMessage = function onSubMessage (packet, cb) {
var decoded = JSON.parse(packet.payload)
if (packet.topic === newSubTopic) {
if (!checkSubsForClient(decoded, that._matcher.match(decoded.topic))) {
that._matcher.add(decoded.topic, decoded)
var clientId
for (var i = 0; i < decoded.subs.length; i++) {
var sub = decoded.subs[i]
clientId = sub.clientId
if (packet.topic === newSubTopic) {
if (!checkSubsForClient(sub, that._matcher.match(sub.topic))) {
that._matcher.add(sub.topic, sub)
}
} else if (packet.topic === rmSubTopic) {
that._matcher
.match(sub.topic)
.filter(matching, sub)
.forEach(rmSub, that._matcher)
}
} else if (packet.topic === rmSubTopic) {
that._matcher
.match(decoded.topic)
.filter(matching, decoded)
.forEach(rmSub, that._matcher)
}
var key = decoded.clientId + '-' + decoded.topic
var waiting = that._waiting[key]
delete that._waiting[key]
var action = packet.topic === newSubTopic ? 'sub' : 'unsub'
var waiting = that._waiting[clientId + '-' + action]
delete that._waiting[clientId + '-' + action]
if (waiting) {
process.nextTick(waiting)
}
Expand All @@ -65,16 +68,10 @@ function rmSub (sub) {
this.remove(sub.topic, sub)
}

function Sub (clientId, topic, qos) {
this.clientId = clientId
this.topic = topic
this.qos = qos
}

inherits(CachedPersistence, EE)

CachedPersistence.prototype._waitFor = function (client, topic, cb) {
this._waiting[client.id + '-' + topic] = cb
CachedPersistence.prototype._waitFor = function (client, action, cb) {
this._waiting[client.id + '-' + action] = cb
}

CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) {
Expand All @@ -83,43 +80,81 @@ CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) {
return
}

var errored = false

this._waitFor(client, 'sub', function (err) {
if (!errored && err) {
return cb(err)
}
if (!errored) {
cb(null, client)
}
})

subs = subs.filter(qosGreaterThanOne)
if (subs.length === 0) {
return cb(null, client)
}

this._parallel({
var ctx = {
cb: cb || noop,
client: client,
broker: this._broker,
topic: newSubTopic
}, brokerPublish, subs, addedSubDone)
topic: newSubTopic,
brokerPublish: brokerPublish
}
ctx.brokerPublish(subs, function (err) {
if (err) {
errored = true
cb(err)
}
})
}

function qosGreaterThanOne (sub) {
return sub.qos > 0
}

function brokerPublish (sub, cb) {
var client = this.client
var encoded = JSON.stringify(new Sub(client.id, sub.topic, sub.qos))
function brokerPublish (subs, cb) {
var encoded = JSON.stringify({subs: subs})
var packet = new Packet({
topic: this.topic,
payload: encoded
})
this.broker.publish(packet, cb)
}

function addedSubDone () {
this.cb(null, this.client)
}

function noop () {}

CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) {
this._parallel({
if (!this.ready) {
this.once('ready', this._removedSubscriptions.bind(this, client, subs, cb))
return
}
var errored = false

this._waitFor(client, 'unsub', function (err) {
if (!errored && err) {
return cb(err)
}
if (!errored) {
cb(null, client)
}
})

var ctx = {
cb: cb || noop,
client: client,
broker: this._broker,
topic: rmSubTopic
}, brokerPublish, subs, addedSubDone)
topic: rmSubTopic,
brokerPublish: brokerPublish
}
ctx.brokerPublish(subs, function (err) {
if (err) {
errored = true
cb(err)
}
})
}

CachedPersistence.prototype.subscriptionsByTopic = function (topic, cb) {
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
"dependencies": {
"aedes-packet": "^1.0.0",
"aedes-persistence": "^4.0.0",
"fastparallel": "^2.2.1",
"multistream": "^2.1.0",
"qlobber": "^0.8.0"
}
Expand Down

0 comments on commit 9773d4e

Please sign in to comment.