Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Aug 9, 2024
1 parent 8d8cdda commit ae2e998
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 5 deletions.
222 changes: 217 additions & 5 deletions lib/browser/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import * as mqtt_request_response from "../common/mqtt_request_response";
import * as mqtt_request_response_internal from "../common/mqtt_request_response_internal";
import {BufferedEventEmitter} from "../common/event";
import {CrtError} from "./error";
import {IStreamingOperation, StreamingOperationOptions} from "../common/mqtt_request_response";
import {LiftedPromise, newLiftedPromise} from "../common/promise";

export * from "../common/mqtt_request_response";
Expand Down Expand Up @@ -71,7 +70,7 @@ interface StreamingOperation extends Operation {

interface ResponsePathEntry {
refCount: number,
correlationTokenPath: string,
correlationTokenPath?: string[],
}

interface ServiceTaskWrapper {
Expand Down Expand Up @@ -105,8 +104,12 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
private constructor(protocolClientAdapter: protocol_client_adapter.ProtocolClientAdapter, options: mqtt_request_response.RequestResponseClientOptions) {
super();

this.protocolClientAdapter = protocolClientAdapter;
this.operationTimeoutInSeconds = options.operationTimeoutInSeconds ?? 60;
this.protocolClientAdapter = protocolClientAdapter;

this.protocolClientAdapter.addListener(protocol_client_adapter.ProtocolClientAdapter.PUBLISH_COMPLETION, this.handlePublishCompletionEvent.bind(this));
this.protocolClientAdapter.addListener(protocol_client_adapter.ProtocolClientAdapter.CONNECTION_STATUS, this.handleConnectionStatusEvent.bind(this));
this.protocolClientAdapter.addListener(protocol_client_adapter.ProtocolClientAdapter.INCOMING_PUBLISH, this.handleIncomingPublishEvent.bind(this));

let config : subscription_manager.SubscriptionManagerConfig = {
maxRequestResponseSubscriptions: options.maxRequestResponseSubscriptions,
Expand All @@ -115,7 +118,6 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
}

this.subscriptionManager = new subscription_manager.SubscriptionManager(protocolClientAdapter, config);

}

/**
Expand Down Expand Up @@ -162,6 +164,9 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
if (this.state != mqtt_request_response_internal.RequestResponseClientState.Closed) {
this.state = mqtt_request_response_internal.RequestResponseClientState.Closed;
this.closeAllOperations();

this.protocolClientAdapter.close();
this.subscriptionManager.close();
}
}

Expand Down Expand Up @@ -221,13 +226,139 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
* browser/node implementers are covariant by returning an implementation of IStreamingOperation. This split
* is necessary because event listening (which streaming operations need) cannot be modeled on an interface.
*/
createStream(streamOptions: StreamingOperationOptions) : IStreamingOperation {
createStream(streamOptions: mqtt_request_response.StreamingOperationOptions) : mqtt_request_response.IStreamingOperation {
// NYI
throw new CrtError("NYI");
}

private canOperationDequeue(operation: Operation) : boolean {
if (operation.type != OperationType.RequestResponse) {
return true;
}

let rrOperation = operation as RequestResponseOperation;
let correlationToken = rrOperation.options.correlationToken ?? "";

return !this.operationsByCorrelationToken.has(correlationToken);
}

private static buildSuscriptionListFromOperation(operation : Operation) : string[] {
if (operation.type == OperationType.RequestResponse) {
let rrOperation = operation as RequestResponseOperation;
return rrOperation.options.subscriptionTopicFilters;
} else {
let streamingOperation = operation as StreamingOperation;
return new Array(streamingOperation.options.subscriptionTopicFilter);
}
}

private addOperationToInProgressTables(operation: Operation) {
if (operation.type == OperationType.Streaming) {
let streamingOperation = operation as StreamingOperation;
let filter = streamingOperation.options.subscriptionTopicFilter;
let existingSet = this.streamingOperationsByTopicFilter.get(filter);
if (!existingSet) {
existingSet = new Set<number>();
this.streamingOperationsByTopicFilter.set(filter, existingSet);
}

existingSet.add(operation.id);
} else {
let rrOperation = operation as RequestResponseOperation;

this.operationsByCorrelationToken.set(rrOperation.options.correlationToken ?? "", operation.id);

for (let path of rrOperation.options.responsePaths) {
let existingEntry = this.correlationTokenPathsByResponsePaths.get(path.topic);
if (!existingEntry) {
existingEntry = {
refCount: 0
};

if (path.correlationTokenJsonPath) {
existingEntry.correlationTokenPath = path.correlationTokenJsonPath.split('.');
}

this.correlationTokenPathsByResponsePaths.set(path.topic, existingEntry);
}

existingEntry.refCount++;
}
}
}

private makeOperationRequest(operation : RequestResponseOperation) : void {
try {
let requestOptions = {
topic: operation.options.publishTopic,
payload: operation.options.payload,
timeoutInSeconds: this.operationTimeoutInSeconds,
completionData: operation.id
};

this.protocolClientAdapter.publish(requestOptions);

operation.state = OperationState.PendingResponse;
} catch (err) {
this.completeOperationWithError(operation.id, new CrtError(`Publish error: "${JSON.stringify(err)}"`));
return;
}
}

private handleAcquireSubscriptionResult(operation: Operation, result: subscription_manager.AcquireSubscriptionResult) {
if (result == subscription_manager.AcquireSubscriptionResult.Failure || result == subscription_manager.AcquireSubscriptionResult.NoCapacity) {
this.completeOperationWithError(operation.id, new CrtError(`Acquire subscription error: ${subscription_manager.acquireSubscriptionResultToString(result)}`));
return;
}

this.addOperationToInProgressTables(operation);

if (result == subscription_manager.AcquireSubscriptionResult.Subscribing) {
operation.state = OperationState.PendingSubscription;
return;
}

if (operation.type == OperationType.Streaming) {
operation.state = OperationState.Subscribed;
// NYI - emit streaming operation subscription established event
} else {
this.makeOperationRequest(operation as RequestResponseOperation);
}
}

private service() {
this.serviceTask = undefined;

if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
return;
}

while (this.operationQueue.length > 0) {
let headId = this.operationQueue[0];
let operation = this.operations.get(headId);
if (!operation) {
this.operationQueue.shift();
continue;
}

if (!this.canOperationDequeue(operation)) {
break;
}

let acquireOptions : subscription_manager.AcquireSubscriptionConfig = {
topicFilters: RequestResponseClient.buildSuscriptionListFromOperation(operation),
operationId: headId,
type: (operation.type == OperationType.RequestResponse) ? subscription_manager.SubscriptionType.RequestResponse : subscription_manager.SubscriptionType.EventStream,
};

let acquireResult = this.subscriptionManager.acquireSubscription(acquireOptions);
if (acquireResult == subscription_manager.AcquireSubscriptionResult.Blocked) {
break;
}

this.operationQueue.shift();
this.handleAcquireSubscriptionResult(operation, acquireResult);
}
}

private clearServiceTask() {
Expand Down Expand Up @@ -339,6 +470,23 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
promise.reject(err);
}

private haltStreamingOperationWithError(id: number, err: CrtError) {
throw new CrtError("NYI");
}

private completeOperationWithError(id: number, err: CrtError) {
let operation = this.operations.get(id);
if (!operation) {
return;
}

if (operation.type == OperationType.RequestResponse) {
this.completeRequestResponseOperationWithError(id, err);
} else {
this.haltStreamingOperationWithError(id, err);
}
}

private completeRequestResponseOperationWithResponse(id: number, responseTopic: string, payload: Buffer) {
let operation = this.operations.get(id);
if (!operation) {
Expand All @@ -359,4 +507,68 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
payload: payload
});
}

private handlePublishCompletionEvent(event: protocol_client_adapter.PublishCompletionEvent) {
if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
return;
}

if (event.err) {
let id = event.completionData as number;
this.completeOperationWithError(id, event.err as CrtError);
}
}

private handleConnectionStatusEvent(event: protocol_client_adapter.ConnectionStatusEvent) {
if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
return;
}

if (event.status == protocol_client_adapter.ConnectionState.Connected && this.operationQueue.length > 0) {
this.wakeServiceTask();
}
}

private handleIncomingPublishEvent(event: protocol_client_adapter.IncomingPublishEvent) {
if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
return;
}

let responsePathEntry = this.correlationTokenPathsByResponsePaths.get(event.topic);
if (!responsePathEntry) {
return;
}

try {
let correlationToken : string | undefined = undefined;

if (!responsePathEntry.correlationTokenPath) {
correlationToken = "";
} else {
let payloadAsString = new TextDecoder().decode(new Uint8Array(event.payload));
let payloadAsJson = JSON.parse(payloadAsString);
let segmentValue : any = payloadAsJson;
for (let segment of responsePathEntry.correlationTokenPath) {
??;
}

if (segmentValue && typeof(segmentValue) === "string") {
correlationToken = segmentValue as string;
}
}

if (!correlationToken) {
return;
}

let id = this.operationsByCorrelationToken.get(correlationToken);
if (!id) {
return;
}

this.completeRequestResponseOperationWithResponse(id, event.topic, event.payload);
} catch (err) {
;
}
}
}
33 changes: 33 additions & 0 deletions lib/browser/mqtt_request_response/protocol_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import * as mqtt311 from "../mqtt";
import * as mqtt5 from "../mqtt5";
import * as mqtt_request_response from "../../common/mqtt_request_response";
import {BufferedEventEmitter} from "../../common/event";
import {MessageReceivedEventListener} from "../mqtt5";
import {QoS} from "../mqtt";


const MS_PER_SECOND : number = 1000;
Expand Down Expand Up @@ -71,6 +73,13 @@ export interface ConnectionStatusEvent {

export type ConnectionStatusEventListener = (event: ConnectionStatusEvent) => void;

export interface IncomingPublishEvent {
topic: string,
payload?: ArrayBuffer
}

export type IncomingPublishEventListener = (event: IncomingPublishEvent) => void;

/*
* Provides a client-agnostic wrapper around the MQTT functionality needed by the browser request-response client.
*
Expand Down Expand Up @@ -101,6 +110,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
})});
};

private incomingPublishListener5 : mqtt5.MessageReceivedEventListener = (event: mqtt5.MessageReceivedEvent) => {
setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, {
topic: event.message.topicName,
payload: event.message.payload
})});
};

private connectionSuccessListener311 : mqtt311.MqttConnectionSuccess = (event : mqtt311.OnConnectionSuccessResult) => {
this.connectionState = ConnectionState.Connected;
setImmediate(() => { this.emit(ProtocolClientAdapter.CONNECTION_STATUS, {
Expand All @@ -116,6 +132,13 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
})});
};

private incomingPublishListener311 : mqtt311.OnMessageCallback = (topic: string, payload: ArrayBuffer, dup: boolean, qos: QoS, retain: boolean) => {
setImmediate(() => { this.emit(ProtocolClientAdapter.INCOMING_PUBLISH, {
topic: topic,
payload: payload
})});
};

private constructor() {
super();

Expand All @@ -130,6 +153,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

client.addListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, adapter.connectionSuccessListener5);
client.addListener(mqtt5.Mqtt5Client.DISCONNECTION, adapter.disconnectionListener5);
client.addListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, adapter.incomingPublishListener5);

adapter.connectionState = client.isConnected() ? ConnectionState.Connected : ConnectionState.Disconnected;

return adapter;
Expand All @@ -142,6 +167,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

client.addListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, adapter.connectionSuccessListener311);
client.addListener(mqtt311.MqttClientConnection.DISCONNECT, adapter.disconnectionListener311);
client.addListener(mqtt311.MqttClientConnection.MESSAGE, adapter.incomingPublishListener311);

adapter.connectionState = client.is_connected() ? ConnectionState.Connected : ConnectionState.Disconnected;

return adapter;
Expand All @@ -157,12 +184,14 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
if (this.client5) {
this.client5.removeListener(mqtt5.Mqtt5Client.CONNECTION_SUCCESS, this.connectionSuccessListener5);
this.client5.removeListener(mqtt5.Mqtt5Client.DISCONNECTION, this.disconnectionListener5);
this.client5.removeListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, this.incomingPublishListener5);
this.client5 = undefined;
}

if (this.client311) {
this.client311.removeListener(mqtt311.MqttClientConnection.CONNECTION_SUCCESS, this.connectionSuccessListener311);
this.client311.removeListener(mqtt311.MqttClientConnection.DISCONNECT, this.disconnectionListener311);
this.client311.removeListener(mqtt311.MqttClientConnection.MESSAGE, this.incomingPublishListener311);
this.client311 = undefined;
}
}
Expand Down Expand Up @@ -434,6 +463,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

static CONNECTION_STATUS : string = 'connectionStatus';

static INCOMING_PUBLISH : string = 'incomingPublish';

on(event: 'publishCompletion', listener: PublishCompletionEventListener): this;

on(event: 'subscribeCompletion', listener: SubscribeCompletionEventListener): this;
Expand All @@ -442,6 +473,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

on(event: 'connectionStatus', listener: ConnectionStatusEventListener): this;

on(event: 'incomingPublish', listener: IncomingPublishEventListener): this;

on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
Expand Down
Loading

0 comments on commit ae2e998

Please sign in to comment.