Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jul 24, 2024
1 parent 2800adb commit 4ed32c6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 15 deletions.
35 changes: 28 additions & 7 deletions lib/browser/mqtt_request_response/protocol_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export interface SubscribeOptions {

export interface SubscribeCompletionEvent {
topicFilter: string,
err?: ICrtError
err?: ICrtError,
retryable?: boolean,
}

export type SubscribeCompletionEventListener = (event: SubscribeCompletionEvent) => void;
Expand Down Expand Up @@ -270,16 +271,19 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
topicFilter: subscribeOptions.topicFilter,
};

if (!mqtt5.isSuccessfulSubackReasonCode(suback.reasonCodes[0])) {
let reasonCode = suback.reasonCodes[0];
if (!mqtt5.isSuccessfulSubackReasonCode(reasonCode)) {
subscribeResult.err = new CrtError(ProtocolClientAdapter.FAILING_SUBACK_REASON_CODE);
subscribeResult.retryable = ProtocolClientAdapter.isSubackReasonCodeRetryable(reasonCode);
}
}
},
(err) => {
if (!subscribeResult) {
subscribeResult = {
topicFilter: subscribeOptions.topicFilter,
err: err
err: err,
retryable: false
};
}
}
Expand All @@ -294,16 +298,19 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {

if (response.qos >= 128) {
subscribeResult.err = new CrtError(ProtocolClientAdapter.FAILING_SUBACK_REASON_CODE);
subscribeResult.retryable = true;
} else if (response.error_code) {
subscribeResult.err = new CrtError("Internal Error");
subscribeResult.retryable = true;
}
}
},
(err) => {
if (!subscribeResult) {
subscribeResult = {
topicFilter: subscribeOptions.topicFilter,
err: err
err: err,
retryable: false,
};
}
}
Expand All @@ -317,7 +324,8 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
if (!subscribeResult) {
subscribeResult = {
topicFilter: subscribeOptions.topicFilter,
err: new CrtError(ProtocolClientAdapter.OPERATION_TIMEOUT)
err: new CrtError(ProtocolClientAdapter.OPERATION_TIMEOUT),
retryable: true,
};
}
},
Expand Down Expand Up @@ -364,7 +372,7 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
unsubscribeResult = {
topicFilter: unsubscribeOptions.topicFilter,
err: err,
retryable: false, // TODO: reevaluate if we can do anything here
retryable: false,
}
}
}
Expand All @@ -383,7 +391,7 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
unsubscribeResult = {
topicFilter: unsubscribeOptions.topicFilter,
err: err,
retryable: false, // TODO: reevaluate
retryable: false,
};
}
}
Expand Down Expand Up @@ -461,4 +469,17 @@ export class ProtocolClientAdapter extends BufferedEventEmitter {
return false;
}
}

private static isSubackReasonCodeRetryable(reasonCode: mqtt5.SubackReasonCode) : boolean {
switch (reasonCode) {
case mqtt5.SubackReasonCode.UnspecifiedError:
case mqtt5.SubackReasonCode.PacketIdentifierInUse:
case mqtt5.SubackReasonCode.ImplementationSpecificError:
case mqtt5.SubackReasonCode.QuotaExceeded:
return true;

default:
return false;
}
}
}
33 changes: 25 additions & 8 deletions lib/browser/mqtt_request_response/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0.
*/

import {CrtError} from "../error";
import {BufferedEventEmitter} from "../../common/event";
import * as protocol_adapter from "./protocol_adapter";
import * as io from "../../common/io";
Expand Down Expand Up @@ -171,6 +170,7 @@ interface SubscriptionStats {
export class SubscriptionManager extends BufferedEventEmitter {
private static logSubject : string = "SubscriptionManager";

private closed: boolean = false;
private records: Map<string, SubscriptionRecord>;

constructor(private adapter: protocol_adapter.ProtocolClientAdapter, private options: SubscriptionManagerConfig) {
Expand All @@ -184,10 +184,16 @@ export class SubscriptionManager extends BufferedEventEmitter {
}

close() {
throw new CrtError("Unimplemented");
this.closed = true;

this.unsubscribeAll();
}

acquireSubscription(options: AcquireSubscriptionConfig) : AcquireSubscriptionResult {
if (this.closed) {
return AcquireSubscriptionResult.Failure;
}

if (options.topicFilters.length == 0) {
return AcquireSubscriptionResult.Failure;
}
Expand Down Expand Up @@ -276,7 +282,7 @@ export class SubscriptionManager extends BufferedEventEmitter {
// @ts-ignore
this.activateSubscription(existingRecord);
} catch (err) {
io.logError(SubscriptionManager.logSubject, `acquire subscription for operation '${options.operationId}' failed subscription activation: ${err.toString()}`);
io.logError(SubscriptionManager.logSubject, `acquire subscription for operation '${options.operationId}' failed subscription activation: ${(err as Error).toString()}`);
return AcquireSubscriptionResult.Failure;
}
}
Expand All @@ -286,12 +292,20 @@ export class SubscriptionManager extends BufferedEventEmitter {
}

releaseSubscription(options: ReleaseSubscriptionsConfig) {
if (this.closed) {
return;
}

for (let topicFilter of options.topicFilters) {
this.removeSubscriptionListener(topicFilter, options.operationId);
}
}

purge() {
if (this.closed) {
return;
}

io.logDebug(SubscriptionManager.logSubject, `purging unused subscriptions`);
let toRemove : Array<string> = new Array<string>();
for (let [_, record] of this.records) {
Expand Down Expand Up @@ -383,7 +397,7 @@ export class SubscriptionManager extends BufferedEventEmitter {
timeoutInSeconds: this.options.operationTimeoutInSeconds
});
} catch (err) {
io.logError(SubscriptionManager.logSubject, `synchronous unsubscribe failure for '${record.topicFilter}': ${err.toString()}`);
io.logError(SubscriptionManager.logSubject, `synchronous unsubscribe failure for '${record.topicFilter}': ${(err as Error).toString()}`);
return;
}

Expand Down Expand Up @@ -485,7 +499,7 @@ export class SubscriptionManager extends BufferedEventEmitter {

record.pendingAction = SubscriptionPendingAction.Subscribing;
} catch (err) {
io.logError(SubscriptionManager.logSubject, `synchronous failure subscribing to '${record.topicFilter}': ${err.toString()}`);
io.logError(SubscriptionManager.logSubject, `synchronous failure subscribing to '${record.topicFilter}': ${(err as Error).toString()}`);

if (record.type == SubscriptionType.RequestResponse) {
this.emitEvents(record, SubscriptionEventType.SubscribeFailure);
Expand Down Expand Up @@ -514,9 +528,12 @@ export class SubscriptionManager extends BufferedEventEmitter {
record.status = SubscriptionStatus.Subscribed;
this.emitEvents(record, SubscriptionEventType.StreamingSubscriptionEstablished);
} else {
// TODO: any way to get retryable from the subscribe failure?
record.poisoned = true;
this.emitEvents(record, SubscriptionEventType.StreamingSubscriptionHalted);
if (event.retryable) {
this.activateSubscription(record);
} else {
record.poisoned = true;
this.emitEvents(record, SubscriptionEventType.StreamingSubscriptionHalted);
}
}
}

Expand Down

0 comments on commit 4ed32c6

Please sign in to comment.