Skip to content
This repository has been archived by the owner on Aug 24, 2020. It is now read-only.

Commit

Permalink
Add option for manual connection management
Browse files Browse the repository at this point in the history
  • Loading branch information
wermerb committed May 21, 2018
1 parent 41a9f66 commit 88fa39e
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 20 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ It is also possible to import into multiple modules if you need multiple mqtt co
* You can provide any configuration that is supported by MQTT.js.
*/
NgxMqttClientModule.withOptions({
manageConnectionManually: true, //this flag will prevent the service to connection automatically
host: 'broker.hivemq.com',
protocol: 'ws',
port: 8000,
Expand Down Expand Up @@ -70,6 +71,15 @@ export class AppComponent implements OnDestroy {
this.status.push(`Mqtt client connection status: ${status}`);
});
}

/**
* Manages connection manually.
* If there is an active connection this will forcefully disconnect that first.
* @param {IClientOptions} config
*/
connect(config: IClientOptions): void {
this._mqttService.connect(config);
}

/**
* Subscribes to fooBar topic.
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ngx-mqtt-client",
"version": "1.3.3",
"version": "1.3.4",
"license": "MIT",
"scripts": {
"ng": "ng",
Expand All @@ -18,9 +18,9 @@
},
"peerDependencies": {
"@angular/core": ">2.0.0",
"mqtt": "^2.18.0",
"rxjs": ">5.5.0",
"typescript": ">2.4.0",
"mqtt": "^2.17.0"
"typescript": ">2.4.0"
},
"devDependencies": {
"@angular/animations": "^5.2.9",
Expand Down Expand Up @@ -49,6 +49,7 @@
"karma-coverage-istanbul-reporter": "^1.2.1",
"karma-jasmine": "~1.1.0",
"karma-jasmine-html-reporter": "^0.2.2",
"mqtt": "^2.18.0",
"ng-packagr": "^2.4.1",
"protractor": "~5.1.2",
"puppeteer": "^1.2.0",
Expand Down
10 changes: 10 additions & 0 deletions src/app/app.component.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Component, OnDestroy} from '@angular/core';
import {ConnectionStatus, MqttService, SubscriptionGrant} from './ngx-mqtt-client';
import {IClientOptions} from 'mqtt';

export interface Foo {
bar: string;
Expand Down Expand Up @@ -27,6 +28,15 @@ export class AppComponent implements OnDestroy {
});
}

/**
* Manages connection manually.
* If there is an active connection this will forcefully disconnect that first.
* @param {IClientOptions} config
*/
connect(config: IClientOptions): void {
this._mqttService.connect(config);
}

/**
* Subscribes to fooBar topic.
* The first emitted value will be a {@see SubscriptionGrant} to confirm your subscription was successful.
Expand Down
7 changes: 7 additions & 0 deletions src/app/ngx-mqtt-client/models/mqtt-config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {IClientOptions} from 'mqtt';

export interface MqttConfig extends IClientOptions {

manageConnectionManually?: boolean;

}
10 changes: 9 additions & 1 deletion src/app/ngx-mqtt-client/services/mqtt.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('MqttService', () => {
]
});

client = jasmine.createSpyObj('client', ['on', 'subscribe', 'unsubscribe', 'publish', 'end']);
client = jasmine.createSpyObj('client', ['on', 'subscribe', 'unsubscribe', 'publish', 'end', 'connected']);
client.on.and.callFake((key: string, value: any) => {
clientOnStore[key] = value;
});
Expand Down Expand Up @@ -190,6 +190,14 @@ describe('MqttService', () => {
});
});

describe('connect', () => {
it('should disconnect first if there is an active connection', () => {
sut.connect({username: 'foo', password: 'bar'});

expect(client.end).toHaveBeenCalledWith(true);
});
});

describe('status', () => {
it('should emit status', () => {
let counter = 0;
Expand Down
18 changes: 15 additions & 3 deletions src/app/ngx-mqtt-client/services/mqtt.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'rxjs/add/observable/throw';
import {ErrorObservable} from 'rxjs/observable/ErrorObservable';
import {concat} from 'rxjs/observable/concat';
import {MQTT_MOCK} from '../tokens/mqtt-mock.injection-token';
import {MqttConfig} from '../models/mqtt-config';

@Injectable()
export class MqttService {
Expand All @@ -25,9 +26,20 @@ export class MqttService {

private _store: { [topic: string]: TopicStore<any> } = {};

constructor(@Inject(MQTT_CONFIG) config: IClientOptions,
@Optional() @Inject(MQTT_MOCK) mqttMock) {
this._client = mqttMock ? mqttMock.connect(null, config) : mqtt.connect(null, config);
constructor(@Inject(MQTT_CONFIG) config: MqttConfig,
@Optional() @Inject(MQTT_MOCK) private _mqttMock) {

if (!config.manageConnectionManually) {
this.connect(config);
}
}

connect(config: IClientOptions): void {
if (this._client && this._client.connected) {
this._client.end(true);
}

this._client = this._mqttMock ? this._mqttMock.connect(null, config) : mqtt.connect(null, config);
this._client.on('message', (topic, message) => this.updateTopic(topic, message.toString()));
this._client.on('offline', () => this._status.next(ConnectionStatus.DISCONNECTED));
this._client.on('connect', () => this._status.next(ConnectionStatus.CONNECTED));
Expand Down
4 changes: 2 additions & 2 deletions src/app/ngx-mqtt-client/tokens/mqtt-config.injection-token.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {InjectionToken} from '@angular/core';
import {IClientOptions} from 'mqtt';
import {MqttConfig} from '../models/mqtt-config';

export const MQTT_CONFIG = new InjectionToken<IClientOptions>('mqtt configuration');
export const MQTT_CONFIG = new InjectionToken<MqttConfig>('mqtt configuration');
57 changes: 46 additions & 11 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2049,7 +2049,7 @@ duplexer@^0.1.1:
version "0.1.1"
resolved "https://registry.yarnpkg.com/duplexer/-/duplexer-0.1.1.tgz#ace6ff808c1ce66b57d1ebf97977acb02334cfc1"

duplexify@^3.4.2, duplexify@^3.5.1, duplexify@^3.5.3:
duplexify@^3.4.2, duplexify@^3.5.3:
version "3.5.4"
resolved "https://registry.yarnpkg.com/duplexify/-/duplexify-3.5.4.tgz#4bb46c1796eabebeec4ca9a2e66b808cb7a3d8b4"
dependencies:
Expand All @@ -2058,6 +2058,15 @@ duplexify@^3.4.2, duplexify@^3.5.1, duplexify@^3.5.3:
readable-stream "^2.0.0"
stream-shift "^1.0.0"

duplexify@^3.5.1, duplexify@^3.6.0:
version "3.6.0"
resolved "https://registry.yarnpkg.com/duplexify/-/duplexify-3.6.0.tgz#592903f5d80b38d037220541264d69a198fb3410"
dependencies:
end-of-stream "^1.0.0"
inherits "^2.0.1"
readable-stream "^2.0.0"
stream-shift "^1.0.0"

ecc-jsbn@~0.1.1:
version "0.1.1"
resolved "https://registry.yarnpkg.com/ecc-jsbn/-/ecc-jsbn-0.1.1.tgz#0fc73a9ed5f0d53c38193398523ef7e543777505"
Expand Down Expand Up @@ -4589,28 +4598,28 @@ move-concurrently@^1.0.1:
rimraf "^2.5.4"
run-queue "^1.0.3"

mqtt-packet@^5.5.0:
version "5.5.0"
resolved "https://registry.yarnpkg.com/mqtt-packet/-/mqtt-packet-5.5.0.tgz#7f53244ba49fdecf795e950c14a9432dbf33bc63"
mqtt-packet@^5.6.0:
version "5.6.0"
resolved "https://registry.yarnpkg.com/mqtt-packet/-/mqtt-packet-5.6.0.tgz#923fb704d0ce0bd6ac81c7e1cc09469b1512d2fd"
dependencies:
bl "^1.2.1"
inherits "^2.0.3"
process-nextick-args "^2.0.0"
safe-buffer "^5.1.0"

mqtt@^2.17.0:
version "2.17.0"
resolved "https://registry.yarnpkg.com/mqtt/-/mqtt-2.17.0.tgz#5630718a6bfe297e768ea1966df5b55fb0eb414b"
mqtt@^2.18.0:
version "2.18.0"
resolved "https://registry.yarnpkg.com/mqtt/-/mqtt-2.18.0.tgz#e3b239263a99a318beaf2cd5b8ef00a3d4eb7a60"
dependencies:
commist "^1.0.0"
concat-stream "^1.6.2"
end-of-stream "^1.4.1"
help-me "^1.0.1"
inherits "^2.0.3"
minimist "^1.2.0"
mqtt-packet "^5.5.0"
mqtt-packet "^5.6.0"
pump "^3.0.0"
readable-stream "^2.3.5"
readable-stream "^2.3.6"
reinterval "^1.1.0"
split2 "^2.1.1"
websocket-stream "^5.1.2"
Expand Down Expand Up @@ -5540,14 +5549,22 @@ pump@^3.0.0:
end-of-stream "^1.1.0"
once "^1.3.1"

pumpify@^1.3.3, pumpify@^1.3.5:
pumpify@^1.3.3:
version "1.4.0"
resolved "https://registry.yarnpkg.com/pumpify/-/pumpify-1.4.0.tgz#80b7c5df7e24153d03f0e7ac8a05a5d068bd07fb"
dependencies:
duplexify "^3.5.3"
inherits "^2.0.3"
pump "^2.0.0"

pumpify@^1.3.5:
version "1.5.1"
resolved "https://registry.yarnpkg.com/pumpify/-/pumpify-1.5.1.tgz#36513be246ab27570b1a374a5ce278bfd74370ce"
dependencies:
duplexify "^3.6.0"
inherits "^2.0.3"
pump "^2.0.0"

[email protected]:
version "1.3.2"
resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.3.2.tgz#9653a036fb7c1ee42342f2325cceefea3926c48d"
Expand Down Expand Up @@ -5716,7 +5733,7 @@ read-pkg@^3.0.0:
normalize-package-data "^2.3.2"
path-type "^3.0.0"

"readable-stream@1 || 2", readable-stream@2, "readable-stream@> 1.0.0 < 3.0.0", readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stream@^2.0.2, readable-stream@^2.0.4, readable-stream@^2.0.6, readable-stream@^2.1.4, readable-stream@^2.1.5, readable-stream@^2.2.2, readable-stream@^2.2.9, readable-stream@^2.3.0, readable-stream@^2.3.3, readable-stream@^2.3.5:
"readable-stream@1 || 2", readable-stream@2, readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stream@^2.0.2, readable-stream@^2.0.4, readable-stream@^2.0.6, readable-stream@^2.1.4, readable-stream@^2.1.5, readable-stream@^2.2.2, readable-stream@^2.2.9, readable-stream@^2.3.0, readable-stream@^2.3.3:
version "2.3.5"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.3.5.tgz#b4f85003a938cbb6ecbce2a124fb1012bd1a838d"
dependencies:
Expand Down Expand Up @@ -5746,6 +5763,18 @@ [email protected], "[email protected] >=1.1.9":
isarray "0.0.1"
string_decoder "~0.10.x"

"readable-stream@> 1.0.0 < 3.0.0", readable-stream@^2.3.5, readable-stream@^2.3.6:
version "2.3.6"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.3.6.tgz#b11c27d88b8ff1fbe070643cf94b0c79ae1b0aaf"
dependencies:
core-util-is "~1.0.0"
inherits "~2.0.3"
isarray "~1.0.0"
process-nextick-args "~2.0.0"
safe-buffer "~5.1.1"
string_decoder "~1.1.1"
util-deprecate "~1.0.1"

readable-stream@~2.0.0, readable-stream@~2.0.5:
version "2.0.6"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.0.6.tgz#8f90341e68a53ccc928788dacfcd11b36eb9b78e"
Expand Down Expand Up @@ -6784,6 +6813,12 @@ string_decoder@~1.0.0, string_decoder@~1.0.3:
dependencies:
safe-buffer "~5.1.0"

string_decoder@~1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.1.1.tgz#9cf1611ba62685d7030ae9e4ba34149c3af03fc8"
dependencies:
safe-buffer "~5.1.0"

stringstream@~0.0.4, stringstream@~0.0.5:
version "0.0.5"
resolved "https://registry.yarnpkg.com/stringstream/-/stringstream-0.0.5.tgz#4e484cd4de5a0bbbee18e46307710a8a81621878"
Expand Down

0 comments on commit 88fa39e

Please sign in to comment.