diff --git a/lib/browser/mqtt_request_response/protocol_adapter.ts b/lib/browser/mqtt_request_response/protocol_adapter.ts index 8ca06643..101c04ba 100644 --- a/lib/browser/mqtt_request_response/protocol_adapter.ts +++ b/lib/browser/mqtt_request_response/protocol_adapter.ts @@ -15,6 +15,7 @@ import * as mqtt311 from "../mqtt"; import * as mqtt5 from "../mqtt5"; import * as mqtt_request_response from "../../common/mqtt_request_response"; import {BufferedEventEmitter} from "../../common/event"; +import {QoS} from "../mqtt"; const MS_PER_SECOND : number = 1000; @@ -71,6 +72,13 @@ export interface ConnectionStatusEvent { export type ConnectionStatusEventListener = (event: ConnectionStatusEvent) => void; +export interface IncomingPublishEvent { + topic: string, + payload?: ArrayBuffer +} + +export type IncomingPublishEventListener = (event: IncomingPublishEvent) => void; + /* * Provides a client-agnostic wrapper around the MQTT functionality needed by the browser request-response client. * @@ -101,6 +109,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { })}); }; + private incomingPublishListener5 : mqtt5.MessageReceivedEventListener = (event: mqtt5.MessageReceivedEvent) => { + setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, { + topic: event.message.topicName, + payload: event.message.payload + })}); + }; + private connectionSuccessListener311 : mqtt311.MqttConnectionSuccess = (event : mqtt311.OnConnectionSuccessResult) => { this.connectionState = ConnectionState.Connected; setImmediate(() => { this.emit(ProtocolClientAdapter.CONNECTION_STATUS, { @@ -116,6 +131,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { })}); }; + private incomingPublishListener311 : mqtt311.OnMessageCallback = (topic: string, payload: ArrayBuffer, dup: boolean, qos: QoS, retain: boolean) => { + setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, { + topic: topic, + payload: payload + })}); + }; + private constructor() { super(); @@ -130,6 +152,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { client.addListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, adapter.connectionSuccessListener5); client.addListener(mqtt5.Mqtt5Client.DISCONNECTION, adapter.disconnectionListener5); + client.addListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, adapter.incomingPublishListener5); + adapter.connectionState = client.isConnected() ? ConnectionState.Connected : ConnectionState.Disconnected; return adapter; @@ -142,6 +166,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { client.addListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, adapter.connectionSuccessListener311); client.addListener(mqtt311.MqttClientConnection.DISCONNECT, adapter.disconnectionListener311); + client.addListener(mqtt311.MqttClientConnection.MESSAGE, adapter.incomingPublishListener311); + adapter.connectionState = client.is_connected() ? ConnectionState.Connected : ConnectionState.Disconnected; return adapter; @@ -157,12 +183,14 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { if (this.client5) { this.client5.removeListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, this.connectionSuccessListener5); this.client5.removeListener(mqtt5.Mqtt5Client.DISCONNECTION, this.disconnectionListener5); + this.client5.removeListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, this.incomingPublishListener5); this.client5 = undefined; } if (this.client311) { this.client311.removeListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, this.connectionSuccessListener311); this.client311.removeListener(mqtt311.MqttClientConnection.DISCONNECT, this.disconnectionListener311); + this.client311.removeListener(mqtt311.MqttClientConnection.MESSAGE, this.incomingPublishListener311); this.client311 = undefined; } } @@ -434,6 +462,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { static CONNECTION_STATUS : string = 'connectionStatus'; + static INCOMING_PUBLISH : string = 'incomingPublish'; + on(event: 'publishCompletion', listener: PublishCompletionEventListener): this; on(event: 'subscribeCompletion', listener: SubscribeCompletionEventListener): this; @@ -442,6 +472,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter { on(event: 'connectionStatus', listener: ConnectionStatusEventListener): this; + on(event: 'incomingPublish', listener: IncomingPublishEventListener): this; + on(event: string | symbol, listener: (...args: any[]) => void): this { super.on(event, listener); return this; diff --git a/lib/browser/mqtt_request_response/protocol_adapter_mock.ts b/lib/browser/mqtt_request_response/protocol_adapter_mock.ts index 5d22c816..7ee7da5a 100644 --- a/lib/browser/mqtt_request_response/protocol_adapter_mock.ts +++ b/lib/browser/mqtt_request_response/protocol_adapter_mock.ts @@ -8,6 +8,7 @@ import * as protocol_adapter from "./protocol_adapter"; import {BufferedEventEmitter} from "../../common/event"; import {ICrtError} from "../../common/error"; import * as subscription_manager from "./subscription_manager"; +import {IncomingPublishEventListener} from "./protocol_adapter"; export interface ProtocolAdapterApiCall { @@ -16,8 +17,12 @@ export interface ProtocolAdapterApiCall { } export interface MockProtocolAdapterOptions { - subscribeHandler?: (subscribeOptions: protocol_adapter.SubscribeOptions) => void, - unsubscribeHandler?: (unsubscribeOptions: protocol_adapter.UnsubscribeOptions) => void, + subscribeHandler?: (adapter: MockProtocolAdapter, subscribeOptions: protocol_adapter.SubscribeOptions, context?: any) => void, + subscribeHandlerContext?: any, + unsubscribeHandler?: (adapter: MockProtocolAdapter, unsubscribeOptions: protocol_adapter.UnsubscribeOptions, context?: any) => void, + unsubscribeHandlerContext?: any, + publishHandler?: (adapter: MockProtocolAdapter, publishOptions: protocol_adapter.PublishOptions, context?: any) => void, + publishHandlerContext?: any, } export class MockProtocolAdapter extends BufferedEventEmitter { @@ -39,6 +44,10 @@ export class MockProtocolAdapter extends BufferedEventEmitter { methodName: "publish", args: publishOptions }); + + if (this.options && this.options.publishHandler) { + this.options.publishHandler(this, publishOptions, this.options.publishHandlerContext); + } } subscribe(subscribeOptions : protocol_adapter.SubscribeOptions) : void { @@ -48,7 +57,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter { }); if (this.options && this.options.subscribeHandler) { - this.options.subscribeHandler(subscribeOptions); + this.options.subscribeHandler(this, subscribeOptions, this.options.subscribeHandlerContext); } } @@ -59,7 +68,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter { }); if (this.options && this.options.unsubscribeHandler) { - this.options.unsubscribeHandler(unsubscribeOptions); + this.options.unsubscribeHandler(this, unsubscribeOptions,this.options.unsubscribeHandlerContext); } } @@ -104,6 +113,7 @@ export class MockProtocolAdapter extends BufferedEventEmitter { event.retryable = retryable; } + // TODO - rework tests to pass with deferred event emission this.emit(protocol_adapter.ProtocolClientAdapter.SUBSCRIBE_COMPLETION, event); } @@ -118,9 +128,31 @@ export class MockProtocolAdapter extends BufferedEventEmitter { event.retryable = retryable; } + // TODO - rework tests to pass with deferred event emission this.emit(protocol_adapter.ProtocolClientAdapter.UNSUBSCRIBE_COMPLETION, event); } + completePublish(completionData: any, err?: ICrtError) : void { + let event : protocol_adapter.PublishCompletionEvent = { + completionData: completionData + }; + + if (err) { + event.err = err; + } + + this.emit(protocol_adapter.ProtocolClientAdapter.PUBLISH_COMPLETION, event); + } + + triggerIncomingPublish(topic: string, payload: ArrayBuffer) : void { + let event : protocol_adapter.IncomingPublishEvent = { + topic : topic, + payload: payload + }; + + this.emit(protocol_adapter.ProtocolClientAdapter.INCOMING_PUBLISH, event); + } + // Events on(event: 'publishCompletion', listener: protocol_adapter.PublishCompletionEventListener): this; @@ -130,6 +162,8 @@ export class MockProtocolAdapter extends BufferedEventEmitter { on(event: 'connectionStatus', listener: protocol_adapter.ConnectionStatusEventListener): this; + on(event: 'incomingPublish', listener: IncomingPublishEventListener): this; + on(event: string | symbol, listener: (...args: any[]) => void): this { super.on(event, listener); return this; diff --git a/lib/browser/mqtt_request_response/subscription_manager.ts b/lib/browser/mqtt_request_response/subscription_manager.ts index 2c5f0b6c..07e662bb 100644 --- a/lib/browser/mqtt_request_response/subscription_manager.ts +++ b/lib/browser/mqtt_request_response/subscription_manager.ts @@ -127,6 +127,23 @@ export enum AcquireSubscriptionResult { Failure, } +export function acquireSubscriptionResultToString(result: AcquireSubscriptionResult) : string { + switch (result) { + case AcquireSubscriptionResult.Subscribed: + return "Subscribed"; + case AcquireSubscriptionResult.Subscribing: + return "Subscribing"; + case AcquireSubscriptionResult.Blocked: + return "Blocked"; + case AcquireSubscriptionResult.NoCapacity: + return "NoCapacity"; + case AcquireSubscriptionResult.Failure: + return "Failure"; + default: + return "Unknown"; + } +} + export interface SubscriptionManagerConfig { maxRequestResponseSubscriptions: number, maxStreamingSubscriptions: number, diff --git a/lib/common/mqtt_request_response_internal.ts b/lib/common/mqtt_request_response_internal.ts new file mode 100644 index 00000000..45a1080a --- /dev/null +++ b/lib/common/mqtt_request_response_internal.ts @@ -0,0 +1,20 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +/** + * @packageDocumentation + * @module mqtt_request_response + */ + +export enum StreamingOperationState { + None, + Open, + Closed, +} + +export enum RequestResponseClientState { + Ready, + Closed +} \ No newline at end of file diff --git a/lib/common/mqtt_shared.ts b/lib/common/mqtt_shared.ts index 1ebea543..358fe49c 100644 --- a/lib/common/mqtt_shared.ts +++ b/lib/common/mqtt_shared.ts @@ -45,3 +45,65 @@ export function normalize_payload(payload: any): Buffer | string { /** @internal */ export const DEFAULT_KEEP_ALIVE : number = 1200; + + +function isValidTopicInternal(topic: string, isFilter: boolean) : boolean { + if (topic.length === 0 || topic.length > 65535) { + return false; + } + + let sawHash : boolean = false; + for (let segment of topic.split('/')) { + if (sawHash) { + return false; + } + + if (segment.length === 0) { + continue; + } + + if (segment.includes("+")) { + if (!isFilter) { + return false; + } + + if (segment.length > 1) { + return false; + } + } + + if (segment.includes("#")) { + if (!isFilter) { + return false; + } + + if (segment.length > 1) { + return false; + } + + sawHash = true; + } + } + + return true; +} + +export function isValidTopicFilter(topicFilter: any) : boolean { + if (typeof(topicFilter) !== 'string') { + return false; + } + + let topicFilterAsString = topicFilter as string; + + return isValidTopicInternal(topicFilterAsString, true); +} + +export function isValidTopic(topic: any) : boolean { + if (typeof(topic) !== 'string') { + return false; + } + + let topicAsString = topic as string; + + return isValidTopicInternal(topicAsString, false); +} \ No newline at end of file diff --git a/test/test_env.ts b/test/test_env.ts index 690e0a0e..1d8d1647 100644 --- a/test/test_env.ts +++ b/test/test_env.ts @@ -108,8 +108,7 @@ export class AWS_IOT_ENV { return AWS_IOT_ENV.MQTT5_HOST !== "" && AWS_IOT_ENV.MQTT5_REGION !== "" && AWS_IOT_ENV.MQTT5_CRED_ACCESS_KEY !== "" && - AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY !== "" && - AWS_IOT_ENV.MQTT5_CRED_SESSION_TOKEN !== ""; + AWS_IOT_ENV.MQTT5_CRED_SECRET_ACCESS_KEY !== ""; } public static mqtt5_is_valid_cognito() {