Skip to content

Commit

Permalink
update canary to run multiple clients
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Nov 15, 2023
1 parent 564f191 commit e29e2fe
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 66 deletions.
135 changes: 70 additions & 65 deletions canary/mqtt5/canary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ yargs.command('*', false, (yargs: any) => {
description: 'number: transaction per second',
type: 'number',
default: 0,
},
'clients': {
description: 'number: concurrent running clients',
type: 'number',
default: 10,
}
});
}, main).parse();

let RECEIVED_TOPIC : string = "Canary/Received/Topic";
let RECEIVED_TOPIC: string = "Canary/Received/Topic";

interface CanaryMqttStatistics {
clientsUsed: number;
Expand All @@ -59,12 +64,13 @@ interface TestContext {
hostname: string;
port: number;
tps_sleep_time: number;
clients: number;
}

interface CanaryContext {
client : mqtt5.Mqtt5Client;
clients: mqtt5.Mqtt5Client[];

mqttStats : CanaryMqttStatistics;
mqttStats: CanaryMqttStatistics;

subscriptions: string[];
}
Expand All @@ -73,31 +79,40 @@ function sleep(millisecond: number) {
return new Promise((resolve) => setInterval(resolve, millisecond));
}

function createCanaryClient(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client {
const client_config : mqtt5.Mqtt5ClientConfig = {
function createCanaryClients(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client[] {
const client_config: mqtt5.Mqtt5ClientConfig = {
hostName: testContext.hostname,
port: testContext.port
};

let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);
const clients = [];

client.on('error', (error: ICrtError) => {});
client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => {
mqttStats.publishesReceived++;
});
for (let i = 0; i < testContext.clients; i++) {
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);

client.on('error', (error: ICrtError) => { });
client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => {
mqttStats.publishesReceived++;
});

++mqttStats.clientsUsed;

return client;
clients.push(client);
}

return clients;
}

async function doSubscribe(context : CanaryContext) {
async function doSubscribe(context: CanaryContext) {
let topicFilter: string = `Mqtt5/Canary/RandomSubscribe${uuid()}`;

try {
context.mqttStats.subscribesAttempted++;

await context.client.subscribe({
let index = Math.floor(Math.random() * 10);
await context.clients[index].subscribe({
subscriptions: [
{topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});

Expand All @@ -109,7 +124,7 @@ async function doSubscribe(context : CanaryContext) {
}
}

async function doUnsubscribe(context : CanaryContext) {
async function doUnsubscribe(context: CanaryContext) {
if (context.subscriptions.length == 0) {
return;
}
Expand All @@ -119,8 +134,9 @@ async function doUnsubscribe(context : CanaryContext) {
try {
context.mqttStats.unsubscribesAttempted++;

await context.client.unsubscribe({
topicFilters: [ topicFilter ]
let index = Math.floor(Math.random() * 10);
await context.clients[index].unsubscribe({
topicFilters: [topicFilter]
});

context.mqttStats.unsubscribesSucceeded++;
Expand All @@ -130,11 +146,11 @@ async function doUnsubscribe(context : CanaryContext) {
}
}

async function doPublish(context : CanaryContext, qos: mqtt5.QoS) {
async function doPublish(context: CanaryContext, qos: mqtt5.QoS) {
try {
context.mqttStats.publishesAttempted++;

await context.client.publish({
let index = Math.floor(Math.random() * 10);
await context.clients[index].publish({
topicName: RECEIVED_TOPIC,
qos: qos,
payload: Buffer.alloc(10000),
Expand All @@ -145,7 +161,7 @@ async function doPublish(context : CanaryContext, qos: mqtt5.QoS) {
correlationData: Buffer.alloc(3000),
contentType: "not-json",
userProperties: [
{name: "name", value: "value"}
{ name: "name", value: "value" }
]
});

Expand All @@ -155,15 +171,30 @@ async function doPublish(context : CanaryContext, qos: mqtt5.QoS) {
}
}

async function runCanaryIteration(testContext: TestContext, endTime: Date, mqttStats: CanaryMqttStatistics) {
async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed: number = 0;

let context : CanaryContext = {
client : createCanaryClient(testContext, mqttStats),
mqttStats : mqttStats,
subscriptions : []
let context: CanaryContext = {
clients: createCanaryClients(testContext, mqttStats),
mqttStats: mqttStats,
subscriptions: []
};

mqttStats.clientsUsed++;
// Start clients
for (let i = 0; i < context.clients.length; i++) {
context.clients[i].start();
const connectionSuccess = once(context.clients[i], "connectionSuccess");

await connectionSuccess;

await context.clients[i].subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});
}

let operationTable = [
{ weight : 1, op: async () => { await doSubscribe(context); }},
Expand All @@ -176,54 +207,27 @@ async function runCanaryIteration(testContext: TestContext, endTime: Date, mqttS
return operation.weight;
});

const connectionSuccess = once(context.client, "connectionSuccess");

context.client.start();

await connectionSuccess;

await context.client.subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});
while (secondsElapsed < testContext.duration) {

let currentTime : Date = new Date();
while (currentTime.getTime() < endTime.getTime()) {
let index : number = weightedRandom(weightedOperations);
let index: number = weightedRandom(weightedOperations);

await (operationTable[index].op)();
++context.mqttStats.totalOperation;
await sleep(testContext.tps_sleep_time);
currentTime = new Date();
}

const stopped = once(context.client, "stopped");

context.client.stop();

await stopped;

context.client.close();
}

async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed : number = 0;
let iteration : number = 0;

while (secondsElapsed < testContext.duration) {
let iterationTime: number = Math.min(testContext.duration - secondsElapsed, 60);
let iterationEnd = new Date(currentTime.getTime() + iterationTime * 1000);
await runCanaryIteration(testContext, iterationEnd, mqttStats);
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
}

iteration++;
console.log(`Iteration ${iteration} stats: ${JSON.stringify(mqttStats)}`);

currentTime = new Date();
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
// Stop and close clients
for (let i = 0; i < context.clients.length; i++) {
const stopped = once(context.clients[i], "stopped");
context.clients[i].stop();
await stopped;
context.clients[i].close();
}

}

async function main(args : Args){
Expand All @@ -247,6 +251,7 @@ async function main(args : Args){
hostname: args.endpoint,
port: args.port,
tps_sleep_time: args.tps == 0 ? 0 : (1000 / args.tps),
clients: args.clients,
}

await runCanary(testContext, mqttStats);
Expand Down
3 changes: 2 additions & 1 deletion codebuild/mqtt5-nodejs-canary-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ env:
variables:
CANARY_DURATION: 25200
CANARY_TPS: 50
CANARY_CLIENT_COUNT: 10
CANARY_LOG_FILE: 'canary_log.txt'
CANARY_LOG_LEVEL: 'Error'
PACKAGE_NAME: 'aws-crt-nodejs'
Expand Down Expand Up @@ -31,7 +32,7 @@ phases:
# Run the Canary
- cd canary/mqtt5
- npm install --unsafe-perm
- python3 ../../codebuild/CanaryWrapper.py --canary_executable node --canary_arguments "./dist/canary.js --duration ${CANARY_DURATION} --endpoint ${ENDPOINT} --tps ${CANARY_TPS}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}"
- python3 ../../codebuild/CanaryWrapper.py --canary_executable node --canary_arguments "./dist/canary.js --duration ${CANARY_DURATION} --endpoint ${ENDPOINT} --tps ${CANARY_TPS} --clients ${CANARY_CLIENT_COUNT}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}"
post_build:
commands:
- echo Build completed on `date`

0 comments on commit e29e2fe

Please sign in to comment.