diff --git a/canary/mqtt5/canary.ts b/canary/mqtt5/canary.ts index d4722343..1411ac10 100644 --- a/canary/mqtt5/canary.ts +++ b/canary/mqtt5/canary.ts @@ -33,11 +33,16 @@ yargs.command('*', false, (yargs: any) => { description: 'number: transaction per second', type: 'number', default: 0, + }, + 'clients': { + description: 'number: concurrent running clients', + type: 'number', + default: 10, } }); }, main).parse(); -let RECEIVED_TOPIC : string = "Canary/Received/Topic"; +let RECEIVED_TOPIC: string = "Canary/Received/Topic"; interface CanaryMqttStatistics { clientsUsed: number; @@ -59,12 +64,13 @@ interface TestContext { hostname: string; port: number; tps_sleep_time: number; + clients: number; } interface CanaryContext { - client : mqtt5.Mqtt5Client; + clients: mqtt5.Mqtt5Client[]; - mqttStats : CanaryMqttStatistics; + mqttStats: CanaryMqttStatistics; subscriptions: string[]; } @@ -73,31 +79,40 @@ function sleep(millisecond: number) { return new Promise((resolve) => setInterval(resolve, millisecond)); } -function createCanaryClient(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client { - const client_config : mqtt5.Mqtt5ClientConfig = { +function createCanaryClients(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client[] { + const client_config: mqtt5.Mqtt5ClientConfig = { hostName: testContext.hostname, port: testContext.port }; - let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config); + const clients = []; - client.on('error', (error: ICrtError) => {}); - client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => { - mqttStats.publishesReceived++; - }); + for (let i = 0; i < testContext.clients; i++) { + let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config); + + client.on('error', (error: ICrtError) => { }); + client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => { + mqttStats.publishesReceived++; + }); + + ++mqttStats.clientsUsed; - return client; + clients.push(client); + } + + return clients; } -async function doSubscribe(context : CanaryContext) { +async function doSubscribe(context: CanaryContext) { let topicFilter: string = `Mqtt5/Canary/RandomSubscribe${uuid()}`; try { context.mqttStats.subscribesAttempted++; - await context.client.subscribe({ + let index = Math.floor(Math.random() * 10); + await context.clients[index].subscribe({ subscriptions: [ - {topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce } + { topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce } ] }); @@ -109,7 +124,7 @@ async function doSubscribe(context : CanaryContext) { } } -async function doUnsubscribe(context : CanaryContext) { +async function doUnsubscribe(context: CanaryContext) { if (context.subscriptions.length == 0) { return; } @@ -119,8 +134,9 @@ async function doUnsubscribe(context : CanaryContext) { try { context.mqttStats.unsubscribesAttempted++; - await context.client.unsubscribe({ - topicFilters: [ topicFilter ] + let index = Math.floor(Math.random() * 10); + await context.clients[index].unsubscribe({ + topicFilters: [topicFilter] }); context.mqttStats.unsubscribesSucceeded++; @@ -130,11 +146,11 @@ async function doUnsubscribe(context : CanaryContext) { } } -async function doPublish(context : CanaryContext, qos: mqtt5.QoS) { +async function doPublish(context: CanaryContext, qos: mqtt5.QoS) { try { context.mqttStats.publishesAttempted++; - - await context.client.publish({ + let index = Math.floor(Math.random() * 10); + await context.clients[index].publish({ topicName: RECEIVED_TOPIC, qos: qos, payload: Buffer.alloc(10000), @@ -145,7 +161,7 @@ async function doPublish(context : CanaryContext, qos: mqtt5.QoS) { correlationData: Buffer.alloc(3000), contentType: "not-json", userProperties: [ - {name: "name", value: "value"} + { name: "name", value: "value" } ] }); @@ -155,15 +171,30 @@ async function doPublish(context : CanaryContext, qos: mqtt5.QoS) { } } -async function runCanaryIteration(testContext: TestContext, endTime: Date, mqttStats: CanaryMqttStatistics) { +async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) { + let startTime: Date = new Date(); + let currentTime: Date = startTime; + let secondsElapsed: number = 0; - let context : CanaryContext = { - client : createCanaryClient(testContext, mqttStats), - mqttStats : mqttStats, - subscriptions : [] + let context: CanaryContext = { + clients: createCanaryClients(testContext, mqttStats), + mqttStats: mqttStats, + subscriptions: [] }; - mqttStats.clientsUsed++; + // Start clients + for (let i = 0; i < context.clients.length; i++) { + context.clients[i].start(); + const connectionSuccess = once(context.clients[i], "connectionSuccess"); + + await connectionSuccess; + + await context.clients[i].subscribe({ + subscriptions: [ + { topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce } + ] + }); + } let operationTable = [ { weight : 1, op: async () => { await doSubscribe(context); }}, @@ -176,54 +207,27 @@ async function runCanaryIteration(testContext: TestContext, endTime: Date, mqttS return operation.weight; }); - const connectionSuccess = once(context.client, "connectionSuccess"); - - context.client.start(); - - await connectionSuccess; - - await context.client.subscribe({ - subscriptions: [ - { topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce } - ] - }); + while (secondsElapsed < testContext.duration) { - let currentTime : Date = new Date(); - while (currentTime.getTime() < endTime.getTime()) { - let index : number = weightedRandom(weightedOperations); + let index: number = weightedRandom(weightedOperations); await (operationTable[index].op)(); ++context.mqttStats.totalOperation; await sleep(testContext.tps_sleep_time); currentTime = new Date(); - } - - const stopped = once(context.client, "stopped"); - - context.client.stop(); - - await stopped; - - context.client.close(); -} -async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) { - let startTime: Date = new Date(); - let currentTime: Date = startTime; - let secondsElapsed : number = 0; - let iteration : number = 0; - - while (secondsElapsed < testContext.duration) { - let iterationTime: number = Math.min(testContext.duration - secondsElapsed, 60); - let iterationEnd = new Date(currentTime.getTime() + iterationTime * 1000); - await runCanaryIteration(testContext, iterationEnd, mqttStats); + secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000; + } - iteration++; - console.log(`Iteration ${iteration} stats: ${JSON.stringify(mqttStats)}`); - currentTime = new Date(); - secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000; + // Stop and close clients + for (let i = 0; i < context.clients.length; i++) { + const stopped = once(context.clients[i], "stopped"); + context.clients[i].stop(); + await stopped; + context.clients[i].close(); } + } async function main(args : Args){ @@ -247,6 +251,7 @@ async function main(args : Args){ hostname: args.endpoint, port: args.port, tps_sleep_time: args.tps == 0 ? 0 : (1000 / args.tps), + clients: args.clients, } await runCanary(testContext, mqttStats); diff --git a/codebuild/mqtt5-nodejs-canary-test.yml b/codebuild/mqtt5-nodejs-canary-test.yml index c075b160..68b4df4c 100644 --- a/codebuild/mqtt5-nodejs-canary-test.yml +++ b/codebuild/mqtt5-nodejs-canary-test.yml @@ -4,6 +4,7 @@ env: variables: CANARY_DURATION: 25200 CANARY_TPS: 50 + CANARY_CLIENT_COUNT: 10 CANARY_LOG_FILE: 'canary_log.txt' CANARY_LOG_LEVEL: 'Error' PACKAGE_NAME: 'aws-crt-nodejs' @@ -31,7 +32,7 @@ phases: # Run the Canary - cd canary/mqtt5 - npm install --unsafe-perm - - python3 ../../codebuild/CanaryWrapper.py --canary_executable node --canary_arguments "./dist/canary.js --duration ${CANARY_DURATION} --endpoint ${ENDPOINT} --tps ${CANARY_TPS}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}" + - python3 ../../codebuild/CanaryWrapper.py --canary_executable node --canary_arguments "./dist/canary.js --duration ${CANARY_DURATION} --endpoint ${ENDPOINT} --tps ${CANARY_TPS} --clients ${CANARY_CLIENT_COUNT}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}" post_build: commands: - echo Build completed on `date` \ No newline at end of file