Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #8 from amarzavery/errorname
Browse files Browse the repository at this point in the history
Multiple updates
  • Loading branch information
amarzavery authored Oct 17, 2018
2 parents bf3d06d + 6eb7a06 commit fe5b384
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 76 deletions.
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
### 2018-10-17 0.1.5
- Updated error code mappers
- Added more constants
- Added support for handling servicebus response properties in the request/response operations
- Fixed a bug in the `sendRequest()` method which ensures that the operation will actually be
retried, rather than returning the previously rejected promise.
- Removed dependency from `uuid`, since `rhea` supports basic uuid operations.

### 2018-10-03 0.1.4
- `ConnectionConfig.entityPath` is optional. Hence, `ConnectionConfig.create()` and
`ConnectionConfig.validate()` will not throw an error if `entityPath` is not defined. However,
Expand Down
7 changes: 3 additions & 4 deletions lib/ConnectionContextBase.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

import { Connection, ConnectionOptions } from "rhea-promise";
import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise";
import { CbsClient } from "./cbs";
import { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
import { TokenProvider } from "./auth/token";
import { ConnectionConfig } from "./connectionConfig";
import { SasTokenProvider } from "./auth/sas";
import * as Constants from "./util/constants";
import * as os from "os";
import * as uuid from "uuid/v4";

/**
* @interface ConnectionContextBase
Expand Down Expand Up @@ -152,11 +151,11 @@ export module ConnectionContextBase {
};

const connection = new Connection(connectionOptions);
const connectionLock = `${Constants.establishConnection}-${uuid()}`;
const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
const connectionContextBase: ConnectionContextBase = {
wasConnectionCloseCalled: false,
connectionLock: connectionLock,
negotiateClaimLock: `${Constants.negotiateClaim}-${uuid()}`,
negotiateClaimLock: `${Constants.negotiateClaim}-${generate_uuid()}`,
connection: connection,
connectionId: connection.id,
cbsSession: new CbsClient(connection, connectionLock),
Expand Down
9 changes: 4 additions & 5 deletions lib/cbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import { TokenInfo } from "./auth/token";
import {
EventContext, ReceiverOptions, Message as AmqpMessage, SenderEvents, ReceiverEvents,
Connection, SenderOptions
Connection, SenderOptions, generate_uuid
} from "rhea-promise";
import * as uuid from "uuid/v4";
import * as Constants from "./util/constants";
import * as log from "./log";
import { translate } from "./errors";
Expand Down Expand Up @@ -35,12 +34,12 @@ export class CbsClient {
/**
* @property {string} replyTo CBS replyTo - The reciever link name that the service should reply to.
*/
readonly replyTo: string = `${Constants.cbsReplyTo}-${uuid()}`;
readonly replyTo: string = `${Constants.cbsReplyTo}-${generate_uuid()}`;
/**
* @property {string} cbsLock The unqiue lock name per $cbs session per connection that is used to
* acquire the lock for establishing a cbs session if one does not exist for an aqmp connection.
*/
readonly cbsLock: string = `${Constants.negotiateCbsKey}-${uuid()}`;
readonly cbsLock: string = `${Constants.negotiateCbsKey}-${generate_uuid()}`;
/**
* @property {string} connectionLock The unqiue lock name per connection that is used to
* acquire the lock for establishing an amqp connection if one does not exist.
Expand Down Expand Up @@ -159,7 +158,7 @@ export class CbsClient {
try {
const request: AmqpMessage = {
body: tokenObject.token,
message_id: uuid(),
message_id: generate_uuid(),
reply_to: this.replyTo,
to: this.endpoint,
application_properties: {
Expand Down
70 changes: 68 additions & 2 deletions lib/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@ export enum ConditionStatusMapper {
* @enum {ConditionErrorNameMapper}
*/
export enum ConditionErrorNameMapper {
/**
* Error is thrown when the address is already in use.
*/
"com.microsoft:address-already-in-use" = "AddressAlreadyInUseError",
/**
* Error is thrown when the store lock is lost.
*/
"com.microsoft:store-lock-lost" = "StoreLockLostError",
/**
* Error is thrown when a matching subscription is not found.
*/
"com.microsoft:no-matching-subscription" = "NoMatchingSubscriptionError",
/**
* Error is thrown when an attempt is made to access a parition that is not owned by the
* requesting entity.
*/
"com.microsoft:partition-not-owned" = "PartitionNotOwnedError",
/**
* Error is thrown when access to publisher has been revoked.
*/
"com.microsoft:publisher-revoked" = "PublisherRevokedError",
/**
* Error is thrown when an attempt is made to create an entity that already exists.
*/
"com.microsoft:entity-already-exists" = "MessagingEntityAlreadyExistsError",
/**
* Error is thrown when trying to access/connect to a disabled messaging entity.
*/
Expand All @@ -59,6 +84,14 @@ export enum ConditionErrorNameMapper {
* Error for signaling general communication errors related to messaging operations.
*/
"amqp:not-found" = "ServiceCommunicationError",
/**
* Error is thrown when the message is not found.
*/
"com.microsoft:message-not-found" = "MessageNotFoundError",
/**
* Error is thrown when relay is not found.
*/
"com.microsoft:relay-not-found" = "RelayNotFoundError",
/**
* Error is thrown when a feature is not implemented yet but the placeholder is present.
*/
Expand Down Expand Up @@ -158,7 +191,7 @@ export enum ConditionErrorNameMapper {
/**
* Error is thrown when an attach was received using a handle that is already in use for an attached link.
*/
"amqp:session:handle-in-use" = "HanldeInUseError",
"amqp:session:handle-in-use" = "HandleInUseError",
/**
* Error is thrown when a frame (other than attach) was received referencing a handle which is not
* currently in use of an attached link.
Expand Down Expand Up @@ -205,6 +238,31 @@ export enum ConditionErrorNameMapper {
* @enum {ErrorNameConditionMapper}
*/
export enum ErrorNameConditionMapper {
/**
* Error is thrown when the address is already in use.
*/
AddressAlreadyInUseError = "com.microsoft:address-already-in-use",
/**
* Error is thrown when the store lock is lost.
*/
StoreLockLostError = "com.microsoft:store-lock-lost",
/**
* Error is thrown when a matching subscription is not found.
*/
NoMatchingSubscriptionError = "com.microsoft:no-matching-subscription",
/**
* Error is thrown when an attempt is made to access a parition that is not owned by the
* requesting entity.
*/
PartitionNotOwnedError = "com.microsoft:partition-not-owned",
/**
* Error is thrown when access to publisher has been revoked.
*/
PublisherRevokedError = "com.microsoft:publisher-revoked",
/**
* Error is thrown when an attempt is made to create an entity that already exists.
*/
MessagingEntityAlreadyExistsError = "com.microsoft:entity-already-exists",
/**
* Error is thrown when trying to access/connect to a disabled messaging entity.
*/
Expand All @@ -229,6 +287,14 @@ export enum ErrorNameConditionMapper {
* Error for signaling general communication errors related to messaging operations.
*/
ServiceCommunicationError = "amqp:not-found",
/**
* Error is thrown when message is not found.
*/
MessageNotFoundError = "com.microsoft:message-not-found",
/**
* Error is thrown when relay is not found.
*/
RelayNotFoundError = "com.microsoft:relay-not-found",
/**
* Error is thrown when a feature is not implemented yet but the placeholder is present.
*/
Expand Down Expand Up @@ -320,7 +386,7 @@ export enum ErrorNameConditionMapper {
/**
* Error is thrown when an attach was received using a handle that is already in use for an attached link.
*/
HanldeInUseError = "amqp:session:handle-in-use",
HandleInUseError = "amqp:session:handle-in-use",
/**
* Error is thrown when a frame (other than attach) was received referencing a handle which is not
* currently in use of an attached link.
Expand Down
34 changes: 23 additions & 11 deletions lib/requestResponseLink.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

import * as uuid from "uuid/v4";
import * as Constants from "./util/constants";
import { retry, RetryConfig, RetryOperationType } from "./retry";
import {
Session, Connection, Sender, Receiver, Message as AmqpMessage, EventContext, AmqpError,
SenderOptions, ReceiverOptions, ReceiverEvents, ReqResLink
SenderOptions, ReceiverOptions, ReceiverEvents, ReqResLink, generate_uuid
} from "rhea-promise";
import { translate, ConditionStatusMapper } from "./errors";
import * as log from "./log";
Expand Down Expand Up @@ -81,27 +80,40 @@ export class RequestResponseLink implements ReqResLink {
throw new Error("request is a required parameter and must be of type 'object'.");
}

if (!request.message_id) request.message_id = uuid();
if (!request.message_id) request.message_id = generate_uuid();

if (!options) options = {};

if (!options.timeoutInSeconds) {
options.timeoutInSeconds = 10;
}

const sendRequestPromise: Promise<AmqpMessage> = new Promise<AmqpMessage>((resolve: any, reject: any) => {
const sendRequestPromise = () => new Promise<AmqpMessage>((resolve: any, reject: any) => {
let waitTimer: any;
let timeOver: boolean = false;
type NormalizedInfo = {
statusCode: number;
statusDescription: string;
errorCondition: string;
};

// Handle different variations of property names in responses emitted by EventHubs and ServiceBus.
const getCodeDescriptionAndError = (props: any): NormalizedInfo => {
if (!props) props = {};
return {
statusCode: (props[Constants.statusCode] || props.statusCode) as number,
statusDescription: (props[Constants.statusDescription] || props.statusDescription) as string,
errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string
};
};

const messageCallback = (context: EventContext) => {
// remove the event listener as this will be registered next time when someone makes a request.
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
const code: number = context.message!.application_properties![Constants.statusCode];
const desc: string = context.message!.application_properties![Constants.statusDescription];
const errorCondition: string | undefined = context.message!.application_properties![Constants.errorCondition];
const info = getCodeDescriptionAndError(context.message!.application_properties);
const responseCorrelationId = context.message!.correlation_id;
log.reqres("[%s] %s response: ", this.connection.id, request.to || "$management", context.message);
if (code > 199 && code < 300) {
if (info.statusCode > 199 && info.statusCode < 300) {
if (request.message_id === responseCorrelationId || request.correlation_id === responseCorrelationId) {
if (!timeOver) {
clearTimeout(waitTimer);
Expand All @@ -115,10 +127,10 @@ export class RequestResponseLink implements ReqResLink {
this.connection.id, request.message_id, responseCorrelationId);
}
} else {
const condition = errorCondition || ConditionStatusMapper[code] || "amqp:internal-error";
const condition = info.errorCondition || ConditionStatusMapper[info.statusCode] || "amqp:internal-error";
const e: AmqpError = {
condition: condition,
description: desc
description: info.statusDescription
};
const error = translate(e);
log.error(error);
Expand All @@ -145,7 +157,7 @@ export class RequestResponseLink implements ReqResLink {
this.sender.send(request);
});
const config: RetryConfig<AmqpMessage> = {
operation: () => sendRequestPromise,
operation: sendRequestPromise,
connectionId: this.connection.id,
operationType: request.to && request.to === Constants.cbsEndpoint
? RetryOperationType.cbsAuth
Expand Down
26 changes: 26 additions & 0 deletions lib/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,29 @@ export const defaultRetryAttempts = 3;
export const defaultConnectionRetryAttempts = 150;
export const defaultDelayBetweenOperationRetriesInSeconds = 5;
export const defaultDelayBetweenRetriesInSeconds = 15;
export const receiverSettleMode = "receiver-settle-mode";
export const dispositionStatus = "disposition-status";
export const fromSequenceNumber = "from-sequence-number";
export const messageCount = "message-count";
export const lockTokens = "lock-tokens";
export const sequenceNumbers = "sequence-numbers";
export const deadLetterReason = "deadletter-reason";
export const deadLetterDescription = "deadletter-description";
export const propertiesToModify = "properties-to-modify";
export const trackingId = "com.microsoft:tracking-id";
export const serverTimeout = "com.microsoft:server-timeout";
export const operations = {
cancelScheduledMessage: "com.microsoft:cancel-scheduled-message",
scheduleMessage: "com.microsoft:schedule-message",
renewLock: "com.microsoft:renew-lock",
peekMessage: "com.microsoft:peek-message",
receiveBySequenceNumber: "com.microsoft:receive-by-sequence-number",
updateDisposition: "com.microsoft:update-disposition",
renewSessionLock: "com.microsoft:renew-session-lock",
setSessionState: "com.microsoft:set-session-state",
getSessionState: "com.microsoft:get-session-state",
enumerateSessions: "com.microsoft:get-message-sessions",
addRule: "com.microsoft:add-rule",
removeRule: "com.microsoft:remove-rule",
enumerateRules: "com.microsoft:enumerate-rules"
};
Loading

0 comments on commit fe5b384

Please sign in to comment.