Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Aug 7, 2024
1 parent ee852cf commit 351f203
Showing 1 changed file with 135 additions and 23 deletions.
158 changes: 135 additions & 23 deletions lib/browser/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import * as mqtt_request_response_internal from "../common/mqtt_request_response
import {BufferedEventEmitter} from "../common/event";
import {CrtError} from "./error";
import {IStreamingOperation, StreamingOperationOptions} from "../common/mqtt_request_response";
import {LiftedPromise, newLiftedPromise} from "../common/promise";

export * from "../common/mqtt_request_response";

Expand Down Expand Up @@ -54,9 +55,23 @@ enum OperationType {
interface Operation {
id: number,
type: OperationType,
options: mqtt_request_response.RequestResponseOperationOptions | mqtt_request_response.StreamingOperationOptions,
state: OperationState,
liftedPromise?: LiftedPromise<>
pendingSubscriptionCount: number,
inClientTables: boolean
}

interface RequestResponseOperation extends Operation {
options: mqtt_request_response.RequestResponseOperationOptions,
resultPromise: LiftedPromise<mqtt_request_response.Response>
}

interface StreamingOperation extends Operation {
options: mqtt_request_response.StreamingOperationOptions
}

interface ResponsePathEntry {
refCount: number,
correlationTokenPath: string,
}

/**
Expand All @@ -74,7 +89,11 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
private subscriptionManager : subscription_manager.SubscriptionManager;
private state: mqtt_request_response_internal.RequestResponseClientState = mqtt_request_response_internal.RequestResponseClientState.Ready;

private operations : Map<number, RequestResponseOperation> = new Map<number, RequestResponseOperation>();
private operations : Map<number, Operation> = new Map<number, Operation>();
private operationsByTopicFilter : Map<string, Set<number>> = new Map<string, Set<number>>(); // topic filter -> set of operation ids
private correlationTokenPathsByResponsePaths : Map<string, ResponsePathEntry> = new Map<string, ResponsePathEntry>(); // response topic -> response path entry
private operationsByCorrelationToken : Map<string, number> = new Map<string, number>(); // correlation token -> operation id

private operationQueue : Array<number> = new Array<number>;

private constructor(protocolClientAdapter: protocol_client_adapter.ProtocolClientAdapter, options: mqtt_request_response.RequestResponseClientOptions) {
Expand Down Expand Up @@ -136,10 +155,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
close(): void {
if (this.state != mqtt_request_response_internal.RequestResponseClientState.Closed) {
this.state = mqtt_request_response_internal.RequestResponseClientState.Closed;
this.closeStreamingOperations();

// TOFIX
//crt_native.mqtt_request_response_client_close(this.native_handle());
this.closeAllOperations();
}
}

Expand Down Expand Up @@ -167,11 +183,13 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
let id = this.nextOperationId;
this.nextOperationId++;

let operation : Operation = {
let resultPromise : LiftedPromise<mqtt_request_response.Response> = newLiftedPromise();
let operation : RequestResponseOperation = {
id: id,
type: OperationType.RequestResponse,
options: requestOptions,
state: OperationState.Queued,
resultPromise: resultPromise
}

this.operations.set(id, operation);
Expand All @@ -181,20 +199,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
this.cancelOperation(id);
}, this.operationTimeoutInSeconds * 1000)

throw new CrtError("NYI");
/*
return new Promise<mqtt_request_response.Response>((resolve, reject) => {
function curriedPromiseCallback(errorCode: number, topic?: string, response?: ArrayBuffer){
return RequestResponseClient._s_on_request_completion(resolve, reject, errorCode, topic, response);
}
try {
// TOFIX
// crt_native.mqtt_request_response_client_submit_request(this.native_handle(), requestOptions, curriedPromiseCallback);
} catch (e) {
reject(e);
}
});*/
return resultPromise.promise;
}

/**
Expand All @@ -211,7 +216,114 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
throw new CrtError("NYI");
}

private closeStreamingOperations() : void {
private closeAllOperations() : void {
// NYI
}

private cancelOperation(id: number) {
// NYI
}

private removeOperationFromTopicFilterSet(topicFilter: string, id: number) {
let operationSet = this.operationsByTopicFilter.get(topicFilter);
if (!operationSet) {
return;
}

operationSet.delete(id);
if (operationSet.size > 0) {
return;
}

??;
}

private decRefResponsePaths(topic: string) {
let pathEntry = this.correlationTokenPathsByResponsePaths.get(topic);
if (!pathEntry) {
return;
}

pathEntry.refCount--;
if (pathEntry.refCount < 1) {
this.correlationTokenPathsByResponsePaths.delete(topic);
}
}

private removeRequestResponseOperation(operation: RequestResponseOperation) {
this.operations.delete(operation.id);

if (operation.inClientTables) {
for (let topicFilter of operation.options.subscriptionTopicFilters) {
this.removeOperationFromTopicFilterSet(topicFilter, operation.id);
}

for (let responsePath of operation.options.responsePaths) {
this.decRefResponsePaths(responsePath.topic);
}

let correlationToken = operation.options.correlationToken ?? "";
this.operationsByCorrelationToken.delete(correlationToken);
}
}

private removeStreamingOperation(operation: StreamingOperation) {
this.operations.delete(operation.id);

if (operation.inClientTables) {
this.removeOperationFromTopicFilterSet(operation.options.subscriptionTopicFilter, operation.id);
}
}

private removeOperation(id: number) {
let operation = this.operations.get(id);
if (!operation) {
return;
}

if (operation.type == OperationType.RequestResponse) {
this.removeRequestResponseOperation(operation as RequestResponseOperation);
} else {
this.removeStreamingOperation(operation as StreamingOperation);
}
}

private completeRequestResponseOperationWithError(id: number, err: CrtError) {
let operation = this.operations.get(id);
if (!operation) {
return;
}

if (operation.type != OperationType.RequestResponse) {
return;
}

let rrOperation = operation as RequestResponseOperation;
let promise = rrOperation.resultPromise;

this.removeOperation(id);

promise.reject(err);
}

private completeRequestResponseOperationWithResponse(id: number, responseTopic: string, payload: Buffer) {
let operation = this.operations.get(id);
if (!operation) {
return;
}

if (operation.type != OperationType.RequestResponse) {
return;
}

let rrOperation = operation as RequestResponseOperation;
let promise = rrOperation.resultPromise;

this.removeOperation(id);

promise.resolve({
topic: responseTopic,
payload: payload
});
}
}

0 comments on commit 351f203

Please sign in to comment.