Skip to content
This repository has been archived by the owner on Aug 24, 2020. It is now read-only.

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
wermerb committed Mar 22, 2018
1 parent 2163fc4 commit 3110961
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 26 deletions.
13 changes: 0 additions & 13 deletions .editorconfig

This file was deleted.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ testem.log
# System Files
.DS_Store
Thumbs.db
*.iml
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@angular/platform-browser-dynamic": "^5.2.0",
"@angular/router": "^5.2.0",
"core-js": "^2.4.1",
"mqtt": "^2.16.0",
"rxjs": "^5.5.6",
"zone.js": "^0.8.19"
},
Expand Down
4 changes: 4 additions & 0 deletions src/app/ngx-mqtt-client/models/mqtt-config.injection-token.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import {InjectionToken} from '@angular/core';
import {IClientOptions} from 'mqtt';

export const MQTT_CONFIG = new InjectionToken<IClientOptions>('mqtt configuration');
18 changes: 18 additions & 0 deletions src/app/ngx-mqtt-client/ngx-mqtt-client.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {ModuleWithProviders, NgModule} from '@angular/core';
import {MqttService} from './services/mqtt.service';
import {MQTT_CONFIG} from './models/mqtt-config.injection-token';
import {IClientOptions} from 'mqtt';

@NgModule({
providers: [MqttService]
})
export class NgxMqttClientModule {
static forRoot(config: IClientOptions): ModuleWithProviders {
return {
ngModule: NgxMqttClientModule,
providers: [
{provide: MQTT_CONFIG, useValue: config}
]
};
}
}
118 changes: 118 additions & 0 deletions src/app/ngx-mqtt-client/services/mqtt.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import {Inject, Injectable} from '@angular/core';
import * as mqtt from 'mqtt';
import {IClientOptions, IClientPublishOptions, ISubscriptionGrant, MqttClient} from 'mqtt';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {MQTT_CONFIG} from '../models/mqtt-config.injection-token';
import {fromPromise} from 'rxjs/observable/fromPromise';
import {map, switchMap} from 'rxjs/operators';
import {of} from 'rxjs/observable/of';
import {empty} from 'rxjs/observable/empty';

@Injectable()
export class MqttService {

private _client: MqttClient;

private _store: { [topic: string]: Subject<any> } = {};

constructor(@Inject(MQTT_CONFIG) config: IClientOptions) {
this._client = mqtt.connect(null, config);
this._client.on('message', (topic, message) => this.updateTopic(topic, message.toString()));
}

subscribeToTopic<T>(topic: string): Observable<T | Error> {
return fromPromise(new Promise((resolve, reject) => {
this._client.subscribe(topic, (error: Error, granted: Array<ISubscriptionGrant>) => {
if (error) {
reject(error);
}

resolve(granted);
});
})).pipe(
switchMap(() => this.addTopic<T>(topic))
);
}

unsubscribeFromTopic(topic: string): Observable<any | Error> {
if (!this._store[topic]) {
return of(new Error(`Cannot unsubscribe. ${topic} topic does not exists.`));
}

return fromPromise(new Promise((resolve, reject) => {
this._client.unsubscribe(topic, (error: Error) => {
if (error) {
reject(error);
}

resolve();
});
})).pipe(
map(() => {
this._store[topic].unsubscribe();
this._store = Object.keys(this._store).reduce((obj, top) => {
if (top !== topic) {
obj[top] = this._store[top];
}

return obj;
}, {});

return empty();
})
);
}

publishToTopic(topic: string,
message: object | string | number | boolean | Buffer,
options?: IClientPublishOptions): Observable<any | Error> {
return fromPromise(new Promise((resolve, reject) => {
let msg: string | Buffer;

if (!(message instanceof Buffer)) {
if (typeof message === 'object') {
msg = JSON.stringify(message);
} else {
msg = message.toString();
}
} else {
msg = message;
}

this._client.publish(topic, msg, options, (error: Error) => {
if (error) {
reject(error);
}

resolve();
});
}));
}

end(force?: boolean, cb?: (...args) => void): void {
this.unsubscribeAll();
this._client.end(force, cb);
}

private unsubscribeAll(): void {
Object.keys(this._store).forEach(key => {
this._store[key].unsubscribe();
});
}

private updateTopic(topic: string, message: string): void {
let msg: string | object;
try {
msg = JSON.parse(message);
} catch {
msg = message;
}
this._store[topic].next(msg);
}

private addTopic<T>(topic: string): Observable<T> {
this._store[topic] = new Subject<T>();
return this._store[topic];
}
}
Loading

0 comments on commit 3110961

Please sign in to comment.