Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup Short Run Mqtt5 Canary #503

Merged
merged 10 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 161 additions & 105 deletions canary/mqtt5/canary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/

import {ICrtError, mqtt5, mqtt5_packet} from "aws-crt";
import {ICrtError, mqtt5} from "aws-crt";
import {once} from "events";
import {v4 as uuid} from "uuid";
var weightedRandom = require('weighted-random');
Expand All @@ -13,207 +13,263 @@ type Args = { [index: string]: any };
const yargs = require('yargs');

yargs.command('*', false, (yargs: any) => {
yargs.option('duration', {
description: 'INT: time in seconds to run the canary',
type: 'number',
default: 3600,
})
yargs.option({
'duration': {
description: 'INT: time in seconds to run the canary',
type: 'number',
default: 120,
},
'endpoint': {
description: 'STR: endpoint to connect to',
type: 'string',
default: 'localhost',
},
'port': {
description: 'INT: port to connect to',
type: 'number',
default: 1883,
},
'tps': {
description: 'INT: transaction per second',
type: 'number',
default: 0,
},
'clients': {
description: 'INT: concurrent running clients',
type: 'number',
default: 10,
}
});
}, main).parse();

let RECEIVED_TOPIC : string = "Canary/Received/Topic";
let RECEIVED_TOPIC: string = "Canary/Received/Topic";

interface CanaryMqttStatistics {
clientsUsed : number;
clientsUsed: number;
publishesReceived: number;
subscribesAttempted : number;
subscribesSucceeded : number;
subscribesFailed : number;
unsubscribesAttempted : number;
unsubscribesSucceeded : number;
unsubscribesFailed : number;
publishesAttempted : number;
publishesSucceeded : number;
publishesFailed : number;
subscribesAttempted: number;
subscribesSucceeded: number;
subscribesFailed: number;
unsubscribesAttempted: number;
unsubscribesSucceeded: number;
unsubscribesFailed: number;
publishesAttempted: number;
publishesSucceeded: number;
publishesFailed: number;
totalOperation: number;
}

interface TestContext {
duration: number;
hostname: string;
port: number;
tps_sleep_time: number;
clients: number;
}

interface CanaryContext {
client : mqtt5.Mqtt5Client;
clients: mqtt5.Mqtt5Client[];

mqttStats : CanaryMqttStatistics;
mqttStats: CanaryMqttStatistics;

subscriptions: string[];
subscriptions: string[][];
}

function createCanaryClient(mqttStats : CanaryMqttStatistics) : mqtt5.Mqtt5Client {
const client_config : mqtt5.Mqtt5ClientConfig = {
hostName : process.env.AWS_TEST_MQTT5_DIRECT_MQTT_HOST ?? "localhost",
port : parseInt(process.env.AWS_TEST_MQTT5_DIRECT_MQTT_PORT ?? "0")
function sleep(millisecond: number) {
return new Promise((resolve) => setInterval(resolve, millisecond));
}

function getRandomIndex(clients : mqtt5.Mqtt5Client[]): number
{
return Math.floor(Math.random() * clients.length);
}

function createCanaryClients(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client[] {
const client_config: mqtt5.Mqtt5ClientConfig = {
hostName: testContext.hostname,
port: testContext.port
};

let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);
const clients = [];

client.on('error', (error: ICrtError) => {});
client.on("messageReceived",(message: mqtt5_packet.PublishPacket) : void => {
mqttStats.publishesReceived++;
});
for (let i = 0; i < testContext.clients; i++) {
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);

client.on('error', (error: ICrtError) => { });
client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => {
mqttStats.publishesReceived++;
});

++mqttStats.clientsUsed;

return client;
clients.push(client);
}

return clients;
}

async function doSubscribe(context : CanaryContext) {
async function doSubscribe(context: CanaryContext) {
let topicFilter: string = `Mqtt5/Canary/RandomSubscribe${uuid()}`;

let index = getRandomIndex(context.clients);
try {
context.mqttStats.subscribesAttempted++;

await context.client.subscribe({
await context.clients[index].subscribe({
subscriptions: [
{topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce}
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});

context.subscriptions.push(topicFilter);
context.subscriptions[index].push(topicFilter);
context.mqttStats.subscribesSucceeded++;
} catch (err) {
context.mqttStats.subscribesFailed++;
context.subscriptions.filter(entry => entry !== topicFilter);
if(context.subscriptions[index].length > 0 )
context.subscriptions[index].filter(entry => entry !== topicFilter);
xiazhvera marked this conversation as resolved.
Show resolved Hide resolved
}
}

async function doUnsubscribe(context : CanaryContext) {
async function doUnsubscribe(context: CanaryContext) {
if (context.subscriptions.length == 0) {
return;
}

let topicFilter: string = context.subscriptions.pop() ?? "canthappen";
let index = getRandomIndex(context.clients);
let topicFilter: string = context.subscriptions[index].pop() ?? "canthappen";

try {
context.mqttStats.unsubscribesAttempted++;

await context.client.unsubscribe({
topicFilters: [ topicFilter ]
await context.clients[index].unsubscribe({
topicFilters: [topicFilter]
});

context.mqttStats.unsubscribesSucceeded++;
} catch (err) {
context.mqttStats.unsubscribesFailed++;
context.subscriptions.push(topicFilter);
context.subscriptions[index].push(topicFilter);
xiazhvera marked this conversation as resolved.
Show resolved Hide resolved
}
}

async function doPublish(context : CanaryContext, qos: mqtt5_packet.QoS) {
async function doPublish(context: CanaryContext, qos: mqtt5.QoS) {
try {
context.mqttStats.publishesAttempted++;

await context.client.publish({
// Generate random binary payload data
let payload = Buffer.alloc(10000, 'a', "utf-8");
let index = getRandomIndex(context.clients);
await context.clients[index].publish({
topicName: RECEIVED_TOPIC,
qos: qos,
payload: Buffer.alloc(10000),
payload: payload,
retain: false,
payloadFormat: mqtt5_packet.PayloadFormatIndicator.Utf8,
payloadFormat: mqtt5.PayloadFormatIndicator.Utf8,
messageExpiryIntervalSeconds: 60,
responseTopic: "talk/to/me",
correlationData: Buffer.alloc(3000),
contentType: "not-json",
userProperties: [
{name: "name", value: "value"}
{ name: "name", value: "value" }
]
});

context.mqttStats.publishesSucceeded++;
} catch (err) {
context.mqttStats.publishesFailed++;
console.log("Publish Failed with " + err);
}
}

async function runCanaryIteration(endTime: Date, mqttStats : CanaryMqttStatistics) {
async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed: number = 0;

let context : CanaryContext = {
client : createCanaryClient(mqttStats),
mqttStats : mqttStats,
subscriptions : []
let context: CanaryContext = {
clients: createCanaryClients(testContext, mqttStats),
mqttStats: mqttStats,
subscriptions: []
};

mqttStats.clientsUsed++;
// Start clients
context.clients.forEach( async client => {
client.start();
const connectionSuccess = once(client, "connectionSuccess");

await connectionSuccess;

await client.subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});
// setup empty subscription string array
context.subscriptions.push(new Array());
});

let operationTable = [
{ weight : 1, op: async () => { await doSubscribe(context); }},
{ weight : 1, op: async () => { await doUnsubscribe(context); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtMostOnce); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtLeastOnce); }}
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtMostOnce); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtLeastOnce); }}
];

var weightedOperations = operationTable.map(function (operation) {
return operation.weight;
});

const connectionSuccess = once(context.client, "connectionSuccess");
while (secondsElapsed < testContext.duration) {

context.client.start();

await connectionSuccess;

await context.client.subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce }
]
});

let currentTime : Date = new Date();
while (currentTime.getTime() < endTime.getTime()) {
let index : number = weightedRandom(weightedOperations);
let index: number = weightedRandom(weightedOperations);

await (operationTable[index].op)();

++context.mqttStats.totalOperation;
await sleep(testContext.tps_sleep_time);
currentTime = new Date();
}

const stopped = once(context.client, "stopped");

context.client.stop();

await stopped;

context.client.close();
}

async function runCanary(durationInSeconds: number, mqttStats : CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed : number = 0;
let iteration : number = 0;
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
}

while (secondsElapsed < durationInSeconds) {
let iterationTime : number = Math.min(durationInSeconds - secondsElapsed, 60);
let iterationEnd = new Date(currentTime.getTime() + iterationTime * 1000);
await runCanaryIteration(iterationEnd, mqttStats);

iteration++;
console.log(`Iteration ${iteration} stats: ${JSON.stringify(mqttStats)}`);
// Stop and close clients
context.clients.forEach( async client => {
const stopped = once(client, "stopped");
client.stop();
await stopped;
client.close();
});

currentTime = new Date();
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
}
}

async function main(args : Args){
let mqttStats : CanaryMqttStatistics = {
clientsUsed : 0,
let mqttStats : CanaryMqttStatistics = {
clientsUsed: 0,
publishesReceived: 0,
subscribesAttempted : 0,
subscribesSucceeded : 0,
subscribesFailed : 0,
unsubscribesAttempted : 0,
unsubscribesSucceeded : 0,
unsubscribesFailed : 0,
publishesAttempted : 0,
publishesSucceeded : 0,
publishesFailed : 0
subscribesAttempted: 0,
subscribesSucceeded: 0,
subscribesFailed: 0,
unsubscribesAttempted: 0,
unsubscribesSucceeded: 0,
unsubscribesFailed: 0,
publishesAttempted: 0,
publishesSucceeded: 0,
publishesFailed: 0,
totalOperation: 0,
};

await runCanary(args.duration, mqttStats);
let testContext: TestContext = {
duration: args.duration,
hostname: args.endpoint,
port: args.port,
tps_sleep_time: args.tps == 0 ? 0 : (1000 / args.tps),
clients: args.clients,
}

await runCanary(testContext, mqttStats);

console.log(`Final Stats: ${JSON.stringify(mqttStats)}`)
console.log(`Final Stats: ${JSON.stringify(mqttStats)}`);
console.log(`Operation TPS: ${mqttStats.totalOperation / testContext.duration}`);

process.exit(0);

Expand Down
2 changes: 1 addition & 1 deletion canary/mqtt5/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"homepage": "https://github.com/awslabs/aws-crt-nodejs#readme",
"devDependencies": {
"@types/node": "^10.17.17",
"typescript": "^3.8.3"
"typescript": "^4.7.4"
},
"dependencies": {
"aws-crt": "file:../../",
Expand Down
Loading
Loading