diff --git a/README.md b/README.md index 3c006e5..7af772e 100644 --- a/README.md +++ b/README.md @@ -41,45 +41,30 @@ npm install aedes-cached-persistence --save In order to reuse aedes-cached-persistence, you need to: ```js -'use strict' - -var util = require('util') -var CachedPersistence = require('aedes-cached-persistence') +const CachedPersistence = require('aedes-cached-persistence') // if you need http://npm.im/aedes-packet, it is available // from this module as well -// var Packet = CachedPersistence.Packet - -function MyPersistence (opts) { - if (!(this instanceof MyPersistence)) { - return new MyPersistence(opts) - } - // initialize your data here - - CachedPersistence.call(this, opts) -} - -util.inherits(MyPersistence, CachedPersistence) - -MyPersistence.prototype.addSubscriptions = function (client, subs, cb) { - // ..persistence specific implementation.. - - // call this._addedSubscriptions when you are done - this._addedSubscriptions(client, subsObjs, cb) -} - -MyPersistence.prototype.removeSubscriptions = function (client, subs, cb) { - // ..persistence specific implementation.. - - // call this._addedSubscriptions when you are done - this._removedSubscriptions(client, subs.map(subs, client), cb) +// const { Packet } = CachedPersistence + +class MyPersistence extends CachedPersistence { + constructor(opts) { + super(opts) + } + addSubscriptions(client, subs, cb) { + // ..persistence specific implementation.. + // call super._addedSubscriptions when you are done + super._addedSubscriptions(client, subs.map(mapSub), cb) + } + removeSubscriptions(client, subs, cb) { + // ..persistence specific implementation.. + // call super._removedSubscriptions when you are done + super._removedSubscriptions(client, subs.map(mapSub), cb) + } } -function toSubObj (sub) { - return { - clientId: this.id, - topic: sub.topic - } +function mapSub (sub) { + return { topic: sub.topic } } ``` diff --git a/index.js b/index.js index 0b1f3fb..5c9fc40 100644 --- a/index.js +++ b/index.js @@ -1,12 +1,8 @@ -/* eslint-disable no-var */ -'use strict' - const QlobberSub = require('qlobber/aedes/qlobber-sub') const { Packet } = require('aedes-persistence') -const EE = require('events').EventEmitter -const inherits = require('util').inherits const MultiStream = require('multistream') const parallel = require('fastparallel') +const { EventEmitter } = require('events') const QlobberOpts = { wildcard_one: '+', wildcard_some: '#', @@ -17,208 +13,203 @@ const newSubTopic = '$SYS/sub/add' const rmSubTopic = '$SYS/sub/rm' const subTopic = '$SYS/sub/+' -function CachedPersistence (opts) { - if (!(this instanceof CachedPersistence)) { - return new CachedPersistence(opts) - } - - EE.call(this) +class CachedPersistence extends EventEmitter { + constructor (opts) { + super() - this.ready = false - this.destroyed = false - this._parallel = parallel() - this._trie = new QlobberSub(QlobberOpts) - this._waiting = {} + this.ready = false + this.destroyed = false + this._parallel = parallel() + this._trie = new QlobberSub(QlobberOpts) + this._waiting = new Map() - const that = this + this.once('ready', () => { + this.ready = true + }) - this.once('ready', function () { - that.ready = true - }) + this._onSubMessage = this._onMessage.bind(this) + } - this._onMessage = function onSubMessage (packet, cb) { + _onMessage (packet, cb) { const decoded = JSON.parse(packet.payload) const clientId = decoded.clientId - for (var i = 0; i < decoded.subs.length; i++) { + for (let i = 0; i < decoded.subs.length; i++) { const sub = decoded.subs[i] sub.clientId = clientId if (packet.topic === newSubTopic) { if (sub.qos > 0) { - that._trie.add(sub.topic, sub) + this._trie.add(sub.topic, sub) } else { - that._trie.remove(sub.topic, sub) + this._trie.remove(sub.topic, sub) } } else if (packet.topic === rmSubTopic) { - that._trie.remove(sub.topic, sub) + this._trie.remove(sub.topic, sub) } } - const action = packet.topic === newSubTopic ? 'sub_' : 'unsub_' - var key = clientId + '-' + action + if (decoded.subs.length > 0) { - key = clientId + '-' + action + decoded.subs[0].topic - } - const waiting = that._waiting[key] - that._waiting[key] = undefined - if (waiting) { - process.nextTick(waiting) + const key = getKey(clientId, packet.topic === newSubTopic, decoded.subs[0].topic) + const waiting = this._waiting.get(key) + if (waiting) { + this._waiting.delete(key) + process.nextTick(waiting) + } } cb() } -} -inherits(CachedPersistence, EE) - -CachedPersistence.prototype._waitFor = function (client, action, cb) { - this._waiting[client.id + '-' + action] = cb -} + get broker () { + return this._broker + } -CachedPersistence.prototype._addedSubscriptions = function (client, subs, cb) { - if (!this.ready) { - this.once('ready', this._addedSubscriptions.bind(this, client, subs, cb)) - return + set broker (broker) { + this._broker = broker + this.broker.subscribe(subTopic, this._onSubMessage, this._setup.bind(this)) } - var errored = false + _waitFor (client, isSub, topic, cb) { + this._waiting.set(getKey(client.id, isSub, topic), cb) + } - this._waitFor(client, 'sub_' + subs[0].topic, function (err) { - if (!errored && err) { - return cb(err) + _addedSubscriptions (client, subs, cb) { + if (!this.ready) { + this.once('ready', this._addedSubscriptions.bind(this, client, subs, cb)) + return } - if (!errored) { - cb(null, client) + + if (subs.length === 0) { + return cb(null, client) } - }) - if (subs.length === 0) { - return cb(null, client) - } + let errored = false - const ctx = { - cb: cb || noop, - client: client, - broker: this._broker, - topic: newSubTopic, - brokerPublish: brokerPublish + this._waitFor(client, true, subs[0].topic, (err) => { + if (!errored && err) { + return cb(err) + } + if (!errored) { + cb(null, client) + } + }) + + const ctx = { + cb: cb || noop, + client, + broker: this._broker, + topic: newSubTopic, + brokerPublish + } + ctx.brokerPublish(subs, (err) => { + if (err) { + errored = true + cb(err) + } + }) } - ctx.brokerPublish(subs, function (err) { - if (err) { - errored = true - cb(err) + + _removedSubscriptions (client, subs, cb) { + if (!this.ready) { + this.once('ready', this._removedSubscriptions.bind(this, client, subs, cb)) + return } - }) -} + let errored = false + let key = subs -function brokerPublish (subs, cb) { - const encoded = JSON.stringify({ clientId: this.client.id, subs: subs }) - const packet = new Packet({ - topic: this.topic, - payload: encoded - }) - this.broker.publish(packet, cb) -} + if (subs.length > 0) { + key = subs[0].topic + } + this._waitFor(client, false, key, (err) => { + if (!errored && err) { + return cb(err) + } + if (!errored) { + cb(null, client) + } + }) + + const ctx = { + cb: cb || noop, + client, + broker: this._broker, + topic: rmSubTopic, + brokerPublish + } + ctx.brokerPublish(subs, (err) => { + if (err) { + errored = true + cb(err) + } + }) + } -function noop () {} + subscriptionsByTopic (topic, cb) { + if (!this.ready) { + this.once('ready', this.subscriptionsByTopic.bind(this, topic, cb)) + return this + } -CachedPersistence.prototype._removedSubscriptions = function (client, subs, cb) { - if (!this.ready) { - this.once('ready', this._removedSubscriptions.bind(this, client, subs, cb)) - return + cb(null, this._trie.match(topic)) } - var errored = false - var key = subs - if (subs.length > 0) { - key = subs[0].topic + cleanSubscriptions (client, cb) { + this.subscriptionsByClient(client, (err, subs, client) => { + if (err || !subs) { return cb(err, client) } + subs = subs.map(subToTopic) + this.removeSubscriptions(client, subs, cb) + }) } - this._waitFor(client, 'unsub_' + key, function (err) { - if (!errored && err) { - return cb(err) - } - if (!errored) { - cb(null, client) - } - }) - const ctx = { - cb: cb || noop, - client: client, - broker: this._broker, - topic: rmSubTopic, - brokerPublish: brokerPublish + outgoingEnqueueCombi (subs, packet, cb) { + this._parallel({ + persistence: this, + packet + }, outgoingEnqueue, subs, cb) + } + + createRetainedStreamCombi (patterns) { + const streams = patterns.map((p) => { + return this.createRetainedStream(p) + }) + return MultiStream.obj(streams) } - ctx.brokerPublish(subs, function (err) { - if (err) { - errored = true - cb(err) - } - }) -} -CachedPersistence.prototype.subscriptionsByTopic = function (topic, cb) { - if (!this.ready) { - this.once('ready', this.subscriptionsByTopic.bind(this, topic, cb)) - return this + destroy (cb) { + this.destroyed = true + this.broker.unsubscribe(subTopic, this._onSubMessage, () => { + if (cb) { + cb() + } + }) } - cb(null, this._trie.match(topic)) + // must emit 'ready' + _setup () { + this.emit('ready') + } } -CachedPersistence.prototype.cleanSubscriptions = function (client, cb) { - const that = this - this.subscriptionsByClient(client, function (err, subs, client) { - if (err || !subs) { return cb(err, client) } - subs = subs.map(subToTopic) - that.removeSubscriptions(client, subs, cb) +function brokerPublish (subs, cb) { + const encoded = JSON.stringify({ clientId: this.client.id, subs }) + const packet = new Packet({ + topic: this.topic, + payload: encoded }) + this.broker.publish(packet, cb) } -CachedPersistence.prototype.outgoingEnqueueCombi = function (subs, packet, cb) { - this._parallel({ - persistence: this, - packet: packet - }, outgoingEnqueue, subs, cb) -} +function noop () { } function outgoingEnqueue (sub, cb) { this.persistence.outgoingEnqueue(sub, this.packet, cb) } -CachedPersistence.prototype.createRetainedStreamCombi = function (patterns) { - const that = this - const streams = patterns.map(function (p) { - return that.createRetainedStream(p) - }) - return MultiStream.obj(streams) -} - -CachedPersistence.prototype.destroy = function (cb) { - this.destroyed = true - this.broker.unsubscribe(subTopic, this._onMessage, function () { - if (cb) { - cb() - } - }) -} - -// must emit 'ready' -CachedPersistence.prototype._setup = function () { - this.emit('ready') -} - function subToTopic (sub) { return sub.topic } -Object.defineProperty(CachedPersistence.prototype, 'broker', { - enumerable: false, - get: function () { - return this._broker - }, - set: function (broker) { - this._broker = broker - this.broker.subscribe(subTopic, this._onMessage, this._setup.bind(this)) - } -}) +function getKey (clientId, isSub, topic) { + return clientId + '-' + (isSub ? 'sub_' : 'unsub_') + (topic || '') +} module.exports = CachedPersistence module.exports.Packet = Packet diff --git a/package.json b/package.json index 6c41def..3df6de9 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "test" ], "engines": { - "node": ">=10" + "node": ">=14" }, "repository": { "type": "git", @@ -54,24 +54,24 @@ }, "homepage": "https://github.com/moscajs/aedes-cached-persistence#readme", "devDependencies": { - "aedes": "^0.45.0", + "aedes": "^0.46.3", "concat-stream": "^2.0.0", "faucet": "0.0.1", "license-checker": "^25.0.1", - "mqemitter": "^4.4.1", + "mqemitter": "^4.5.0", "nyc": "^15.1.0", "pump": "^3.0.0", - "release-it": "^14.2.2", + "release-it": "^15.0.0", "snazzy": "^9.0.0", - "standard": "^16.0.3", + "standard": "^17.0.0", "tape": "^5.2.1", "through2": "^4.0.2", - "tsd": "^0.14.0" + "tsd": "^0.20.0" }, "dependencies": { "aedes-persistence": "^9.1.2", - "fastparallel": "^2.4.0", - "multistream": "^4.0.1", + "fastparallel": "^2.4.1", + "multistream": "^4.1.0", "qlobber": "^7.0.0" } } diff --git a/test.js b/test.js index 13a2e06..58da531 100644 --- a/test.js +++ b/test.js @@ -31,7 +31,7 @@ class MyPersistence extends CachedPersistence { if (err) { return cb(err) } - this._addedSubscriptions(client, subs, cb) + super._addedSubscriptions(client, subs, cb) }) } @@ -43,7 +43,7 @@ class MyPersistence extends CachedPersistence { const subsObjs = topics.map(function mapSub (topic) { return { topic } }) - this._removedSubscriptions(client, subsObjs, cb) + super._removedSubscriptions(client, subsObjs, cb) }) } }