Skip to content
This repository has been archived by the owner on Aug 24, 2020. It is now read-only.

Commit

Permalink
Add subscription success check
Browse files Browse the repository at this point in the history
  • Loading branch information
wermerb committed Mar 24, 2018
1 parent 1942926 commit c5c6236
Show file tree
Hide file tree
Showing 18 changed files with 140 additions and 102 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ export class AppComponent implements OnDestroy {

/**
* Subscribes to fooBar topic.
* This subscription will only emit new value if someone publish into the fooBar topic.
* The first emitted value will be a {@see SubscriptionGrant} to confirm your subscription was successful.
* After that the subscription will only emit new value if someone publishes into the fooBar topic.
* */
subscribe(): void {
this._mqttService.subscribeTo<Foo>('fooBar')
.subscribe((msg: Foo) => {
this.messages.push(msg)
this._mqttService.subscribeTo<any>('fooBar').pipe()
.subscribe((msg) => {
if (msg instanceof SubscriptionGrant) {
console.log('Successfully subscribed!');
} else {
this.messages.push(msg);
}
});
}

Expand Down
18 changes: 9 additions & 9 deletions e2e/app.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { AppPage } from './app.po';
import {AppPage} from './app.po';

describe('ngx-mqtt-client App', () => {
let page: AppPage;
let page: AppPage;

beforeEach(() => {
page = new AppPage();
});
beforeEach(() => {
page = new AppPage();
});

it('should display welcome message', () => {
page.navigateTo();
expect(page.getParagraphText()).toEqual('Welcome to app!');
});
it('should display welcome message', () => {
page.navigateTo();
expect(page.getParagraphText()).toEqual('Welcome to app!');
});
});
14 changes: 7 additions & 7 deletions e2e/app.po.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { browser, by, element } from 'protractor';
import {browser, by, element} from 'protractor';

export class AppPage {
navigateTo() {
return browser.get('/');
}
navigateTo() {
return browser.get('/');
}

getParagraphText() {
return element(by.css('app-root h1')).getText();
}
getParagraphText() {
return element(by.css('app-root h1')).getText();
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ngx-mqtt-client",
"version": "1.0.5",
"version": "1.1.0",
"license": "MIT",
"scripts": {
"ng": "ng",
Expand Down
45 changes: 23 additions & 22 deletions protractor.conf.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
// Protractor configuration file, see link for more information
// https://github.com/angular/protractor/blob/master/lib/config.ts

const { SpecReporter } = require('jasmine-spec-reporter');
const {SpecReporter} = require('jasmine-spec-reporter');

exports.config = {
allScriptsTimeout: 11000,
specs: [
'./e2e/**/*.e2e-spec.ts'
],
capabilities: {
'browserName': 'chrome'
},
directConnect: true,
baseUrl: 'http://localhost:4200/',
framework: 'jasmine',
jasmineNodeOpts: {
showColors: true,
defaultTimeoutInterval: 30000,
print: function() {}
},
onPrepare() {
require('ts-node').register({
project: 'e2e/tsconfig.e2e.json'
});
jasmine.getEnv().addReporter(new SpecReporter({ spec: { displayStacktrace: true } }));
}
allScriptsTimeout: 11000,
specs: [
'./e2e/**/*.e2e-spec.ts'
],
capabilities: {
'browserName': 'chrome'
},
directConnect: true,
baseUrl: 'http://localhost:4200/',
framework: 'jasmine',
jasmineNodeOpts: {
showColors: true,
defaultTimeoutInterval: 30000,
print: function () {
}
},
onPrepare() {
require('ts-node').register({
project: 'e2e/tsconfig.e2e.json'
});
jasmine.getEnv().addReporter(new SpecReporter({spec: {displayStacktrace: true}}));
}
};
16 changes: 10 additions & 6 deletions src/app/app.component.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Component, OnDestroy} from '@angular/core';
import {MqttService} from './ngx-mqtt-client';
import {MqttService, SubscriptionGrant} from './ngx-mqtt-client';

export interface Foo {
bar: string;
Expand All @@ -19,14 +19,18 @@ export class AppComponent implements OnDestroy {

/**
* Subscribes to fooBar topic.
* This subscription will only emit new value if someone publish into the fooBar topic.
* The first emitted value will be a {@see SubscriptionGrant} to confirm your subscription was successful.
* After that the subscription will only emit new value if someone publishes into the fooBar topic.
* */
subscribe(): void {
this._mqttService.subscribeTo<Foo>('fooBar')
.subscribe((msg: Foo) => {
this.messages.push(msg);
this._mqttService.subscribeTo<any>('fooBar').pipe()
.subscribe((msg) => {
if (msg instanceof SubscriptionGrant) {
this.messages.push('Successfully subscribed!' as any);
} else {
this.messages.push(msg);
}
});
this.messages.push('Successfully subscribed!' as any);
}


Expand Down
2 changes: 0 additions & 2 deletions src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ import {NgModule} from '@angular/core';

import {AppComponent} from './app.component';
import {NgxMqttClientModule} from './ngx-mqtt-client';
import {BrowserAnimationsModule} from '@angular/platform-browser/animations';

@NgModule({
declarations: [
AppComponent
],
imports: [
BrowserModule,
BrowserAnimationsModule,
NgxMqttClientModule.forRoot({
host: 'broker.hivemq.com',
protocol: 'ws',
Expand Down
1 change: 1 addition & 0 deletions src/app/ngx-mqtt-client/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './services/mqtt.service';
export * from './ngx-mqtt-client.module';
export * from './models/subscription-grant';
14 changes: 14 additions & 0 deletions src/app/ngx-mqtt-client/models/subscription-grant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {ISubscriptionGrant, QoS} from 'mqtt';

export class SubscriptionGrant implements ISubscriptionGrant {

topic: string;

qos: QoS | number;

constructor(grant: ISubscriptionGrant) {
this.topic = grant.topic;
this.qos = grant.qos;
}

}
10 changes: 10 additions & 0 deletions src/app/ngx-mqtt-client/models/topic-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {SubscriptionGrant} from './subscription-grant';
import {Subject} from 'rxjs/Subject';

export interface TopicStore<T> {

grant: SubscriptionGrant;

stream: Subject<T>;

}
33 changes: 20 additions & 13 deletions src/app/ngx-mqtt-client/services/mqtt.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,43 @@ import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {MQTT_CONFIG} from '../tokens/mqtt-config.injection-token';
import {fromPromise} from 'rxjs/observable/fromPromise';
import {map, switchMap} from 'rxjs/operators';
import {map, mergeMap, switchMap} from 'rxjs/operators';
import 'rxjs/add/observable/throw';
import {empty} from 'rxjs/observable/empty';
import {of} from 'rxjs/observable/of';
import {SubscriptionGrant} from '../models/subscription-grant';
import {TopicStore} from '../models/topic-store';

@Injectable()
export class MqttService {

private _client: MqttClient;

private _store: { [topic: string]: Subject<any> } = {};
private _store: { [topic: string]: TopicStore<any> } = {};

constructor(@Inject(MQTT_CONFIG) config: IClientOptions) {
this._client = mqtt.connect(null, config);
this._client.on('message', (topic, message) => this.updateTopic(topic, message.toString()));
}

subscribeTo<T>(topic: string, options?: IClientSubscribeOptions): Observable<T> {
subscribeTo<T>(topic: string, options?: IClientSubscribeOptions): Observable<(SubscriptionGrant | T)> {
return fromPromise(new Promise((resolve, reject) => {
if (!this._store[topic]) {
this._client.subscribe(topic, options, (error: Error, granted: Array<ISubscriptionGrant>) => {
if (error) {
reject(error);
}

resolve(granted);
resolve(new SubscriptionGrant(granted[0]));
});
} else {
resolve(this._store[topic].grant);
}
resolve();
})).pipe(
switchMap(() => this.addTopic<T>(topic))
);
mergeMap((granted: SubscriptionGrant) =>
[of(granted), this.addTopic<T>(topic, granted)]
),
switchMap((message: any) => message)
)
}

unsubscribeFrom(topic: string): Observable<any> {
Expand All @@ -53,7 +59,7 @@ export class MqttService {
});
})).pipe(
map(() => {
this._store[topic].unsubscribe();
this._store[topic].stream.unsubscribe();
const {[topic]: removed, ...newStore} = this._store;
this._store = newStore;
return empty();
Expand Down Expand Up @@ -111,13 +117,14 @@ export class MqttService {
} catch (ex) {
msg = message;
}
this._store[topic].next(msg);
this._store[topic].stream.next(msg);
}

private addTopic<T>(topic: string): Observable<T> {
private addTopic<T>(topic: string, grant: SubscriptionGrant): Observable<T> {
console.log('itt')
if (!this._store[topic]) {
this._store[topic] = new Subject<T>();
this._store[topic] = {grant, stream: new Subject<T>()};
}
return this._store[topic];
return this._store[topic].stream;
}
}
2 changes: 1 addition & 1 deletion src/environments/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
// The list of which env maps to which file can be found in `.angular-cli.json`.

export const environment = {
production: false
production: false
};
12 changes: 6 additions & 6 deletions src/index.html
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>NgxMqttClient</title>
<base href="/">
<meta charset="utf-8">
<title>NgxMqttClient</title>
<base href="/">

<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="icon" type="image/x-icon" href="favicon.ico">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="icon" type="image/x-icon" href="favicon.ico">
</head>
<body>
<app-root></app-root>
<app-root></app-root>
</body>
</html>
12 changes: 6 additions & 6 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { enableProdMode } from '@angular/core';
import { platformBrowserDynamic } from '@angular/platform-browser-dynamic';
import {enableProdMode} from '@angular/core';
import {platformBrowserDynamic} from '@angular/platform-browser-dynamic';

import { AppModule } from './app/app.module';
import { environment } from './environments/environment';
import {AppModule} from './app/app.module';
import {environment} from './environments/environment';

if (environment.production) {
enableProdMode();
enableProdMode();
}

platformBrowserDynamic().bootstrapModule(AppModule)
.catch(err => console.log(err));
.catch(err => console.log(err));
24 changes: 11 additions & 13 deletions src/polyfills.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
/** Evergreen browsers require these. **/
// Used for reflect-metadata in JIT. If you use AOT (and only Angular decorators), you can remove.
import 'core-js/es7/reflect';
/***************************************************************************************************
* Zone JS is required by default for Angular itself.
*/
import 'zone.js/dist/zone'; // Included with Angular CLI.


/**
Expand All @@ -57,22 +61,16 @@ import 'core-js/es7/reflect';
* user can disable parts of macroTask/DomEvents patch by setting following flags
*/

// (window as any).__Zone_disable_requestAnimationFrame = true; // disable patch requestAnimationFrame
// (window as any).__Zone_disable_on_property = true; // disable patch onProperty such as onclick
// (window as any).__zone_symbol__BLACK_LISTED_EVENTS = ['scroll', 'mousemove']; // disable patch specified eventNames
// (window as any).__Zone_disable_requestAnimationFrame = true; // disable patch requestAnimationFrame
// (window as any).__Zone_disable_on_property = true; // disable patch onProperty such as onclick
// (window as any).__zone_symbol__BLACK_LISTED_EVENTS = ['scroll', 'mousemove']; // disable patch specified eventNames

/*
* in IE/Edge developer tools, the addEventListener will also be wrapped by zone.js
* with the following flag, it will bypass `zone.js` patch for IE/Edge
*/
/*
* in IE/Edge developer tools, the addEventListener will also be wrapped by zone.js
* with the following flag, it will bypass `zone.js` patch for IE/Edge
*/
// (window as any).__Zone_enable_cross_context_check = true;

/***************************************************************************************************
* Zone JS is required by default for Angular itself.
*/
import 'zone.js/dist/zone'; // Included with Angular CLI.



/***************************************************************************************************
* APPLICATION IMPORTS
Expand Down
11 changes: 4 additions & 7 deletions src/test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
// This file is required by karma.conf.js and loads recursively all the .spec and framework files

import 'zone.js/dist/zone-testing';
import { getTestBed } from '@angular/core/testing';
import {
BrowserDynamicTestingModule,
platformBrowserDynamicTesting
} from '@angular/platform-browser-dynamic/testing';
import {getTestBed} from '@angular/core/testing';
import {BrowserDynamicTestingModule, platformBrowserDynamicTesting} from '@angular/platform-browser-dynamic/testing';

declare const require: any;

// First, initialize the Angular testing environment.
getTestBed().initTestEnvironment(
BrowserDynamicTestingModule,
platformBrowserDynamicTesting()
BrowserDynamicTestingModule,
platformBrowserDynamicTesting()
);
// Then we find all the tests.
const context = require.context('./', true, /\.spec\.ts$/);
Expand Down
Loading

0 comments on commit c5c6236

Please sign in to comment.