From c6e65d5dd2845f4094043a7b6eb9e13d7453faa8 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 9 Nov 2023 14:45:55 -0800 Subject: [PATCH 1/4] Update For V8 Memory Cage (#476) --- source/http_stream.c | 2 +- source/module.c | 42 ++++++++++++++++++++++++++++++--- source/module.h | 19 +++++++++++++++ source/mqtt_client_connection.c | 8 +++++-- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/source/http_stream.c b/source/http_stream.c index 23923a196..5921b90ce 100644 --- a/source/http_stream.c +++ b/source/http_stream.c @@ -140,7 +140,7 @@ static void s_on_body_call(napi_env env, napi_value on_body, void *context, void AWS_NAPI_ENSURE( env, - napi_create_external_arraybuffer( + aws_napi_create_external_arraybuffer( env, args->chunk.buffer, args->chunk.len, s_external_arraybuffer_finalizer, args, ¶ms[0])); AWS_NAPI_ENSURE( diff --git a/source/module.c b/source/module.c index 40309cd01..c4bb91337 100644 --- a/source/module.c +++ b/source/module.c @@ -268,9 +268,10 @@ int aws_napi_attach_object_property_binary_as_finalizable_external( } napi_value napi_binary = NULL; + AWS_NAPI_ENSURE( env, - napi_create_external_arraybuffer( + aws_napi_create_external_arraybuffer( env, data_buffer->buffer, data_buffer->len, @@ -831,7 +832,6 @@ const char *aws_napi_status_to_str(napi_status status) { case napi_callback_scope_mismatch: reason = "napi_callback_scope_mismatch"; break; -#if NAPI_VERSION >= 3 case napi_queue_full: reason = "napi_queue_full"; break; @@ -841,7 +841,9 @@ const char *aws_napi_status_to_str(napi_status status) { case napi_bigint_expected: reason = "napi_bigint_expected"; break; -#endif + case napi_no_external_buffers_allowed: + reason = "napi_no_external_buffers_allowed"; + break; } return reason; } @@ -931,6 +933,40 @@ static void s_handle_failed_callback(napi_env env, napi_value function, napi_sta } } +napi_status aws_napi_create_external_arraybuffer( + napi_env env, + void *external_data, + size_t byte_length, + napi_finalize finalize_cb, + void *finalize_hint, + napi_value *result) { + + napi_status external_buffer_status = + napi_create_external_arraybuffer(env, external_data, byte_length, finalize_cb, finalize_hint, result); + + if (external_buffer_status == napi_no_external_buffers_allowed) { + + // The external buffer is disabled, manually copy the external_data into Node + void *napi_buf_data = NULL; + napi_status create_arraybuffer_status = napi_create_arraybuffer(env, byte_length, &napi_buf_data, result); + + if (create_arraybuffer_status != napi_ok) { + AWS_NAPI_LOGF_ERROR( + "napi_create_arraybuffer (in aws_napi_create_external_arraybuffer) failed with : %s", + aws_napi_status_to_str(create_arraybuffer_status)); + return create_arraybuffer_status; + } + + memcpy(napi_buf_data, external_data, byte_length); + + // As the data has been copied into the Node, invoke the finalize callback to make sure the + // data is released. + finalize_cb(env, finalize_hint, finalize_hint); + } + + return napi_ok; +} + napi_status aws_napi_dispatch_threadsafe_function( napi_env env, napi_threadsafe_function tsfn, diff --git a/source/module.h b/source/module.h index b9fd2349f..f4d8faa40 100644 --- a/source/module.h +++ b/source/module.h @@ -225,6 +225,25 @@ struct aws_client_bootstrap *aws_napi_get_default_client_bootstrap(void); const char *aws_napi_status_to_str(napi_status status); +/* + * Wrapper around napi_create_external_arraybuffer, + * The function returns `napi_ok` if array buffer is created successfully in nodejs. Otherwise returns the error code. + * The user is responsible to release/proceed the `external_data` if the creation failed. + * + * `aws_napi_create_external_arraybuffer` handles the creation of the arraybuffer from the `external_data`. As + * some runtimes other than Node.js have dropped support for external buffers, the napi function call will fail in such + * case. If the call failed, the function will directly create an arraybuffer in Node and copy the data of external + * buffer into it. Once data copied, the `finalize_cb` will be immediately invoked to release the external data. + * + */ +napi_status aws_napi_create_external_arraybuffer( + napi_env env, + void *external_data, + size_t byte_length, + napi_finalize finalize_cb, + void *finalize_hint, + napi_value *result); + /** * Gets the allocator used to allocate native resources in the node environment, should be used * by all binding code in this extension diff --git a/source/mqtt_client_connection.c b/source/mqtt_client_connection.c index 341c4cfdd..ce5775312 100644 --- a/source/mqtt_client_connection.c +++ b/source/mqtt_client_connection.c @@ -1424,15 +1424,17 @@ static void s_on_publish_call(napi_env env, napi_value on_publish, void *context AWS_NAPI_ENSURE( env, napi_create_string_utf8(env, (const char *)args->topic.buffer, args->topic.len, ¶ms[0])); + AWS_NAPI_ENSURE( env, - napi_create_external_arraybuffer( + aws_napi_create_external_arraybuffer( env, args->payload->buffer, args->payload->len, s_publish_external_arraybuffer_finalizer, args->payload, ¶ms[1])); + AWS_NAPI_ENSURE(env, napi_get_boolean(env, args->dup, ¶ms[2])); AWS_NAPI_ENSURE(env, napi_create_int32(env, args->qos, ¶ms[3])); AWS_NAPI_ENSURE(env, napi_get_boolean(env, args->retain, ¶ms[4])); @@ -1672,15 +1674,17 @@ static void s_on_any_publish_call(napi_env env, napi_value on_publish, void *con AWS_NAPI_ENSURE( env, napi_create_string_utf8(env, aws_string_c_str(args->topic), args->topic->len, ¶ms[0])); + AWS_NAPI_ENSURE( env, - napi_create_external_arraybuffer( + aws_napi_create_external_arraybuffer( env, args->payload->buffer, args->payload->len, s_any_publish_external_arraybuffer_finalizer, args->payload, ¶ms[1])); + AWS_NAPI_ENSURE(env, napi_get_boolean(env, args->dup, ¶ms[2])); AWS_NAPI_ENSURE(env, napi_create_int32(env, args->qos, ¶ms[3])); AWS_NAPI_ENSURE(env, napi_get_boolean(env, args->retain, ¶ms[4])); From 64b096b3b22effb791b8ff5e3b90ecc5b3718816 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Fri, 10 Nov 2023 08:26:07 -0800 Subject: [PATCH 2/4] Fix Segfault on Electron Exit (#501) --- lib/native/binding.d.ts | 2 + lib/native/binding.js | 14 +++++- source/module.c | 109 +++++++++++++++++++++++++++++++--------- source/module.h | 6 +++ 4 files changed, 104 insertions(+), 27 deletions(-) diff --git a/lib/native/binding.d.ts b/lib/native/binding.d.ts index 8e6ec7f0f..a3212fb81 100644 --- a/lib/native/binding.d.ts +++ b/lib/native/binding.d.ts @@ -41,6 +41,8 @@ export function native_memory_dump(): void; export function error_code_to_string(error_code: number): string; /** @internal */ export function error_code_to_name(error_code: number): string; +/** @internal */ +export function disable_threadsafe_function(): number; /* IO */ /** @internal */ diff --git a/lib/native/binding.js b/lib/native/binding.js index ebdb4bb03..b869915be 100644 --- a/lib/native/binding.js +++ b/lib/native/binding.js @@ -16,7 +16,7 @@ const CRuntimeType = Object.freeze({ }); function getCRuntime() { - if(platform() !== "linux") { + if (platform() !== "linux") { return CRuntimeType.NON_LINUX; } @@ -74,5 +74,15 @@ if (binding == undefined) { throw new Error("AWS CRT binary not present in any of the following locations:\n\t" + search_paths.join('\n\t')); } +import crt_native from "./binding" +/** Electron will shutdown the node process on exit, which causes the threadsafe function to segfault. To prevent + * the segfault we disable the threadsafe function on node process exit. */ +if (process.versions.hasOwnProperty('electron')) { + process.on('exit', function () { + crt_native.disable_threadsafe_function(); + }); +} + + export default binding; -export { CRuntimeType , cRuntime }; +export { CRuntimeType, cRuntime }; diff --git a/source/module.c b/source/module.c index c4bb91337..6acd34b54 100644 --- a/source/module.c +++ b/source/module.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -47,6 +48,15 @@ AWS_STATIC_ASSERT(NAPI_VERSION >= 4); #define AWS_DEFINE_ERROR_INFO_CRT_NODEJS(CODE, STR) AWS_DEFINE_ERROR_INFO(CODE, STR, "aws-crt-nodejs") +/* TODO: + * Hardcoded enum value for `napi_no_external_buffers_allowed`. + * The enum `napi_no_external_buffers_allowed` is introduced in node14. + * Use it for external buffer related changes after bump to node 14 */ +#define NAPI_NO_EXTERNAL_BUFFER_ENUM_VALUE 22 + +static bool s_tsfn_enabled = false; +static struct aws_rw_lock s_tsfn_lock; + /* clang-format off */ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_CRT_NODEJS( @@ -743,6 +753,28 @@ int aws_napi_get_property_array_size( return AWS_OP_SUCCESS; } +void s_aws_enable_threadsafe_function(void) { + aws_rw_lock_wlock(&s_tsfn_lock); + s_tsfn_enabled = true; + aws_rw_lock_wunlock(&s_tsfn_lock); +} + +void s_aws_disable_threadsafe_function(void) { + aws_rw_lock_wlock(&s_tsfn_lock); + s_tsfn_enabled = false; + aws_rw_lock_wunlock(&s_tsfn_lock); +} + +napi_value aws_napi_disable_threadsafe_function(napi_env env, napi_callback_info info) { + (void)info; + if (env == NULL) { + aws_raise_error(AWS_CRT_NODEJS_ERROR_THREADSAFE_FUNCTION_NULL_NAPI_ENV); + return NULL; + } + s_aws_disable_threadsafe_function(); + return NULL; +} + void aws_napi_throw_last_error(napi_env env) { const int error_code = aws_last_error(); napi_throw_error(env, aws_error_str(error_code), aws_error_debug_str(error_code)); @@ -975,19 +1007,25 @@ napi_status aws_napi_dispatch_threadsafe_function( size_t argc, napi_value *argv) { - napi_status call_status = napi_ok; - if (!this_ptr) { - AWS_NAPI_ENSURE(env, napi_get_undefined(env, &this_ptr)); + aws_rw_lock_rlock(&s_tsfn_lock); + napi_status result = napi_ok; + if (s_tsfn_enabled) { + napi_status call_status = napi_ok; + if (!this_ptr) { + AWS_NAPI_ENSURE(env, napi_get_undefined(env, &this_ptr)); + } + AWS_NAPI_CALL(env, napi_call_function(env, this_ptr, function, argc, argv, NULL), { + call_status = status; + s_handle_failed_callback(env, function, status); + }); + /* main thread can exit now */ + napi_unref_threadsafe_function(env, tsfn); + /* Must always decrement the ref count, or the function will be pinned */ + napi_status release_status = napi_release_threadsafe_function(tsfn, napi_tsfn_release); + result = (call_status != napi_ok) ? call_status : release_status; } - AWS_NAPI_CALL(env, napi_call_function(env, this_ptr, function, argc, argv, NULL), { - call_status = status; - s_handle_failed_callback(env, function, status); - }); - /* main thread can exit now */ - napi_unref_threadsafe_function(env, tsfn); - /* Must always decrement the ref count, or the function will be pinned */ - napi_status release_status = napi_release_threadsafe_function(tsfn, napi_tsfn_release); - return (call_status != napi_ok) ? call_status : release_status; + aws_rw_lock_runlock(&s_tsfn_lock); + return result; } napi_status aws_napi_create_threadsafe_function( @@ -1018,30 +1056,45 @@ napi_status aws_napi_create_threadsafe_function( napi_status aws_napi_release_threadsafe_function( napi_threadsafe_function function, napi_threadsafe_function_release_mode mode) { - if (function) { - return napi_release_threadsafe_function(function, mode); + napi_status result = napi_ok; + aws_rw_lock_rlock(&s_tsfn_lock); + if (s_tsfn_enabled && function) { + result = napi_release_threadsafe_function(function, mode); } - return napi_ok; + aws_rw_lock_runlock(&s_tsfn_lock); + return result; } napi_status aws_napi_acquire_threadsafe_function(napi_threadsafe_function function) { - if (function) { - return napi_acquire_threadsafe_function(function); + napi_status result = napi_ok; + aws_rw_lock_rlock(&s_tsfn_lock); + if (s_tsfn_enabled && function) { + result = napi_acquire_threadsafe_function(function); } - return napi_ok; + aws_rw_lock_runlock(&s_tsfn_lock); + return result; } napi_status aws_napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function function) { - if (function) { - return napi_unref_threadsafe_function(env, function); + napi_status result = napi_ok; + aws_rw_lock_rlock(&s_tsfn_lock); + if (s_tsfn_enabled && function) { + result = napi_unref_threadsafe_function(env, function); } - return napi_ok; + aws_rw_lock_runlock(&s_tsfn_lock); + return result; } napi_status aws_napi_queue_threadsafe_function(napi_threadsafe_function function, void *user_data) { - /* increase the ref count, gets decreased when the call completes */ - AWS_NAPI_ENSURE(NULL, napi_acquire_threadsafe_function(function)); - return napi_call_threadsafe_function(function, user_data, napi_tsfn_nonblocking); + napi_status result = napi_ok; + aws_rw_lock_rlock(&s_tsfn_lock); + if (s_tsfn_enabled && function) { + /* increase the ref count, gets decreased when the call completes */ + AWS_NAPI_ENSURE(NULL, napi_acquire_threadsafe_function(function)); + result = napi_call_threadsafe_function(function, user_data, napi_tsfn_nonblocking); + } + aws_rw_lock_runlock(&s_tsfn_lock); + return result; } AWS_STATIC_STRING_FROM_LITERAL(s_mem_tracing_env_var, "AWS_CRT_MEMORY_TRACING"); @@ -1145,6 +1198,7 @@ static void s_napi_context_finalize(napi_env env, void *user_data, void *finaliz --s_module_initialize_count; if (s_module_initialize_count == 0) { + aws_client_bootstrap_release(s_default_client_bootstrap); s_default_client_bootstrap = NULL; @@ -1163,6 +1217,8 @@ static void s_napi_context_finalize(napi_env env, void *user_data, void *finaliz aws_mqtt_library_clean_up(); s_uninstall_crash_handler(); + // clean up threadsafe function lock + aws_rw_lock_clean_up(&s_tsfn_lock); } struct aws_napi_context *ctx = user_data; @@ -1178,7 +1234,6 @@ static void s_napi_context_finalize(napi_env env, void *user_data, void *finaliz aws_mem_tracer_destroy(ctx_allocator); } } - aws_mutex_unlock(&s_module_lock); } @@ -1230,6 +1285,9 @@ static bool s_create_and_register_function( struct aws_allocator *allocator = aws_napi_get_allocator(); if (s_module_initialize_count == 0) { + aws_rw_lock_init(&s_tsfn_lock); + s_aws_enable_threadsafe_function(); + s_install_crash_handler(); aws_mqtt_library_init(allocator); @@ -1295,6 +1353,7 @@ static bool s_create_and_register_function( CREATE_AND_REGISTER_FN(native_memory_dump) CREATE_AND_REGISTER_FN(error_code_to_string) CREATE_AND_REGISTER_FN(error_code_to_name) + CREATE_AND_REGISTER_FN(disable_threadsafe_function) /* IO */ CREATE_AND_REGISTER_FN(io_logging_enable) diff --git a/source/module.h b/source/module.h index f4d8faa40..581ddd49e 100644 --- a/source/module.h +++ b/source/module.h @@ -309,6 +309,12 @@ napi_status aws_napi_unref_threadsafe_function(napi_env env, napi_threadsafe_fun */ napi_status aws_napi_queue_threadsafe_function(napi_threadsafe_function function, void *user_data); +/** + * Disable the thread safe function operations. The function will prevent any access to threadsafe function + * including acquire, release, function call and so on. + */ +napi_value aws_napi_disable_threadsafe_function(napi_env env, napi_callback_info info); + /* * One of these will be allocated each time the module init function is called * Any global state that isn't thread safe or requires clean up should be stored From ed815db9b86a25aba54a69a1576a8e94c784c3e2 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 13 Nov 2023 11:45:36 -0800 Subject: [PATCH 3/4] Mqtt5 topic aliasing (#502) --- crt/aws-c-auth | 2 +- crt/aws-c-common | 2 +- crt/aws-c-http | 2 +- crt/aws-c-mqtt | 2 +- lib/common/mqtt5_packet.ts | 10 ++++ lib/native/aws_iot_mqtt5.ts | 9 +++ lib/native/mqtt5.spec.ts | 80 +++++++++++++++++++++++++ lib/native/mqtt5.ts | 114 ++++++++++++++++++++++++++++++++++++ source/mqtt5_client.c | 103 +++++++++++++++++++++++++++++++- 9 files changed, 319 insertions(+), 5 deletions(-) diff --git a/crt/aws-c-auth b/crt/aws-c-auth index ecbb37fe6..71bad382f 160000 --- a/crt/aws-c-auth +++ b/crt/aws-c-auth @@ -1 +1 @@ -Subproject commit ecbb37fe6549e7a0f5814050be076a2312118b38 +Subproject commit 71bad382fe0a61e4426987c1abe6aca2fe1c1953 diff --git a/crt/aws-c-common b/crt/aws-c-common index 4c0a9f579..00157ef20 160000 --- a/crt/aws-c-common +++ b/crt/aws-c-common @@ -1 +1 @@ -Subproject commit 4c0a9f579d3064f086b42a2d39aaea721e7e71ca +Subproject commit 00157ef20fd270d393930ac1cfed5190ccba2af8 diff --git a/crt/aws-c-http b/crt/aws-c-http index d777859b6..a082f8a20 160000 --- a/crt/aws-c-http +++ b/crt/aws-c-http @@ -1 +1 @@ -Subproject commit d777859b6da179b9098f87a2077fbf2129b574dc +Subproject commit a082f8a2067e4a31db73f1d4ffd702a8dc0f7089 diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index c475ef1bf..5d198cf2d 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit c475ef1bfcc31f815e46558864161728a15a70ae +Subproject commit 5d198cf2d09b19bb18bf03e4425831a246d0a391 diff --git a/lib/common/mqtt5_packet.ts b/lib/common/mqtt5_packet.ts index e4bcf6965..9dcc762c4 100644 --- a/lib/common/mqtt5_packet.ts +++ b/lib/common/mqtt5_packet.ts @@ -783,6 +783,16 @@ export interface PublishPacket extends IPacket { */ messageExpiryIntervalSeconds?: number; + /** + * Sent publishes - (Node only) topic alias to use, if possible, when encoding this packet. Only used if the + * client's outbound topic aliasing mode is set to Manual. + * + * Received publishes - topic alias used by the server when transmitting the publish to the client. + * + * See [MQTT5 Topic Alias](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113) + */ + topicAlias?: number; + /** * Opaque topic string intended to assist with request/response implementations. Not internally meaningful to * MQTT5 or this client. diff --git a/lib/native/aws_iot_mqtt5.ts b/lib/native/aws_iot_mqtt5.ts index f0b480599..dcd189a2d 100644 --- a/lib/native/aws_iot_mqtt5.ts +++ b/lib/native/aws_iot_mqtt5.ts @@ -467,6 +467,15 @@ export class AwsIotMqtt5ClientConfigBuilder { return this; } + /** + * Overrides how the MQTT5 client should behave with respect to topic aliasing + * + * @param topicAliasingOptions how the MQTT5 client should behave with respect to topic aliasing + */ + withTopicAliasingOptions(topicAliasingOptions: mqtt5.TopicAliasingOptions) : AwsIotMqtt5ClientConfigBuilder { + this.config.topicAliasingOptions = topicAliasingOptions; + return this; + } /** * Constructs an MQTT5 Client configuration object for creating mqtt5 clients. diff --git a/lib/native/mqtt5.spec.ts b/lib/native/mqtt5.spec.ts index a36a69f1b..144c7a009 100644 --- a/lib/native/mqtt5.spec.ts +++ b/lib/native/mqtt5.spec.ts @@ -640,3 +640,83 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir client.close(); }); + +/* This test doesn't verify LRU aliasing it just gives some evidence that enabling LRU aliasing doesn't blow something up */ +test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Publish with LRU aliasing', async () => { + let clientConfig : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig(); + clientConfig.topicAliasingOptions = { + outboundBehavior : mqtt5.OutboundTopicAliasBehaviorType.LRU, + outboundCacheMaxSize : 10 + }; + + let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(clientConfig); + + let connectionSuccess = once(client, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); + let stopped = once(client, mqtt5.Mqtt5Client.STOPPED); + + client.start(); + + await connectionSuccess; + + let topic : string = `test-${uuid()}`; + let testPayload : Buffer = Buffer.from("Derp", "utf-8"); + let qos : mqtt5.QoS = mqtt5.QoS.AtLeastOnce; + + await client.publish({ + topicName: topic, + qos: qos, + payload: testPayload + }); + + await client.publish({ + topicName: topic, + qos: qos, + payload: testPayload + }); + + client.stop(); + await stopped; + + client.close(); +}); + +/* This test doesn't verify manual aliasing it just gives some evidence that enabling manual aliasing doesn't blow something up */ +test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Publish with manual aliasing', async () => { + let clientConfig : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig(); + clientConfig.topicAliasingOptions = { + outboundBehavior : mqtt5.OutboundTopicAliasBehaviorType.Manual, + outboundCacheMaxSize : 10 + }; + + let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(clientConfig); + + let connectionSuccess = once(client, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); + let stopped = once(client, mqtt5.Mqtt5Client.STOPPED); + + client.start(); + + await connectionSuccess; + + let topic : string = `test-${uuid()}`; + let testPayload : Buffer = Buffer.from("Derp", "utf-8"); + let qos : mqtt5.QoS = mqtt5.QoS.AtLeastOnce; + + await client.publish({ + topicName: topic, + qos: qos, + payload: testPayload, + topicAlias: 1 + }); + + await client.publish({ + topicName: topic, + qos: qos, + payload: testPayload, + topicAlias: 1 + }); + + client.stop(); + await stopped; + + client.close(); +}); diff --git a/lib/native/mqtt5.ts b/lib/native/mqtt5.ts index fe94cd59e..9c1a19e2a 100644 --- a/lib/native/mqtt5.ts +++ b/lib/native/mqtt5.ts @@ -39,6 +39,111 @@ export * from '../common/mqtt5_packet'; */ export type WebsocketHandshakeTransform = (request: http.HttpRequest, done: (error_code?: number) => void) => void; +/** + * An enumeration that controls how the client applies topic aliasing to outbound publish packets. + * + * Topic alias behavior is described in https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113 + */ +export enum OutboundTopicAliasBehaviorType { + + /** + * Maps to Disabled. This keeps the client from being broken (by default) if the broker + * topic aliasing implementation has a problem. + */ + Default = 0, + + /** + * Outbound aliasing is the user's responsibility. Client will cache and use + * previously-established aliases if they fall within the negotiated limits of the connection. + * + * The user must still always submit a full topic in their publishes because disconnections disrupt + * topic alias mappings unpredictably. The client will properly use a requested alias when the most-recently-seen + * binding for a topic alias value matches the alias and topic in the publish packet. + */ + Manual = 1, + + /** + * (Recommended) The client will ignore any user-specified topic aliasing and instead use an LRU cache to drive + * alias usage. + */ + LRU = 2, + + /** + * Completely disable outbound topic aliasing. + */ + Disabled = 3, +} + +/** + * An enumeration that controls whether or not the client allows the broker to send publishes that use topic + * aliasing. + * + * Topic alias behavior is described in https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113 + */ +export enum InboundTopicAliasBehaviorType { + + /** + * Maps to Disabled. This keeps the client from being broken (by default) if the broker + * topic aliasing implementation has a problem. + */ + Default = 0, + + /** + * Allow the server to send PUBLISH packets to the client that use topic aliasing + */ + Enabled = 1, + + /** + * Forbid the server from sending PUBLISH packets to the client that use topic aliasing + */ + Disabled = 2, +} + +/** + * Configuration for all client topic aliasing behavior. + */ +export interface TopicAliasingOptions { + + /** + * Controls what kind of outbound topic aliasing behavior the client should attempt to use. + * + * If topic aliasing is not supported by the server, this setting has no effect and any attempts to directly + * manipulate the topic alias id in outbound publishes will be ignored. + * + * If left undefined, then outbound topic aliasing is disabled. + */ + outboundBehavior?: OutboundTopicAliasBehaviorType, + + /** + * If outbound topic aliasing is set to LRU, this controls the maximum size of the cache. If outbound topic + * aliasing is set to LRU and this is zero or undefined, a sensible default is used (25). If outbound topic + * aliasing is not set to LRU, then this setting has no effect. + * + * The final size of the cache is determined by the minimum of this setting and the value of the + * topic_alias_maximum property of the received CONNACK. If the received CONNACK does not have an explicit + * positive value for that field, outbound topic aliasing is disabled for the duration of that connection. + */ + outboundCacheMaxSize?: number, + + /** + * Controls whether or not the client allows the broker to use topic aliasing when sending publishes. Even if + * inbound topic aliasing is enabled, it is up to the server to choose whether or not to use it. + * + * If left undefined, then inbound topic aliasing is disabled. + */ + inboundBehavior?: InboundTopicAliasBehaviorType, + + /** + * If inbound topic aliasing is enabled, this will control the size of the inbound alias cache. If inbound + * aliases are enabled and this is zero or undefined, then a sensible default will be used (25). If inbound + * aliases are disabled, this setting has no effect. + * + * Behaviorally, this value overrides anything present in the topic_alias_maximum field of + * the CONNECT packet options. + */ + inboundCacheMaxSize?: number, +} + /** * Information about the client's queue of operations */ @@ -256,6 +361,15 @@ export interface Mqtt5ClientConfig { * @group Node-only */ extendedValidationAndFlowControlOptions? : ClientExtendedValidationAndFlowControl; + + /** + * Additional controls for client behavior with respect to topic alias usage. + * + * If this setting is left undefined, then topic aliasing behavior will be disabled. + * + * @group Node-only + */ + topicAliasingOptions? : TopicAliasingOptions } /** diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index f25461b99..0df45713f 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -47,6 +47,7 @@ static const char *AWS_NAPI_KEY_QOS = "qos"; static const char *AWS_NAPI_KEY_RETAIN = "retain"; static const char *AWS_NAPI_KEY_PAYLOAD_FORMAT = "payloadFormat"; static const char *AWS_NAPI_KEY_MESSAGE_EXPIRY_INTERVAL_SECONDS = "messageExpiryIntervalSeconds"; +static const char *AWS_NAPI_KEY_TOPIC_ALIAS = "topicAlias"; static const char *AWS_NAPI_KEY_RESPONSE_TOPIC = "responseTopic"; static const char *AWS_NAPI_KEY_CORRELATION_DATA = "correlationData"; static const char *AWS_NAPI_KEY_CONTENT_TYPE = "contentType"; @@ -87,6 +88,11 @@ static const char *AWS_NAPI_KEY_INCOMPLETE_OPERATION_SIZE = "incompleteOperation static const char *AWS_NAPI_KEY_UNACKED_OPERATION_COUNT = "unackedOperationCount"; static const char *AWS_NAPI_KEY_UNACKED_OPERATION_SIZE = "unackedOperationSize"; static const char *AWS_NAPI_KEY_TYPE = "type"; +static const char *AWS_NAPI_KEY_TOPIC_ALIASING_OPTIONS = "topicAliasingOptions"; +static const char *AWS_NAPI_KEY_OUTBOUND_BEHAVIOR = "outboundBehavior"; +static const char *AWS_NAPI_KEY_OUTBOUND_CACHE_MAX_SIZE = "outboundCacheMaxSize"; +static const char *AWS_NAPI_KEY_INBOUND_BEHAVIOR = "inboundBehavior"; +static const char *AWS_NAPI_KEY_INBOUND_CACHE_MAX_SIZE = "inboundCacheMaxSize"; /* * Binding object that outlives the associated napi wrapper object. When that object finalizes, then it's a signal @@ -1100,6 +1106,11 @@ static int s_create_napi_publish_packet( return AWS_OP_ERR; } + if (aws_napi_attach_object_property_optional_u16( + packet, env, AWS_NAPI_KEY_TOPIC_ALIAS, publish_view->topic_alias)) { + return AWS_OP_ERR; + } + if (aws_napi_attach_object_property_optional_string( packet, env, AWS_NAPI_KEY_RESPONSE_TOPIC, publish_view->response_topic)) { return AWS_OP_ERR; @@ -1345,6 +1356,7 @@ struct aws_napi_mqtt5_publish_storage { enum aws_mqtt5_payload_format_indicator payload_format; uint32_t message_expiry_interval_seconds; + uint16_t topic_alias; struct aws_byte_buf response_topic; struct aws_byte_cursor response_topic_cursor; @@ -1459,6 +1471,13 @@ static int s_init_publish_options_from_napi( &publish_storage->message_expiry_interval_seconds), { publish_options->message_expiry_interval_seconds = &publish_storage->message_expiry_interval_seconds; }); + PARSE_OPTIONAL_NAPI_PROPERTY( + AWS_NAPI_KEY_TOPIC_ALIAS, + "s_init_publish_options_from_napi", + aws_napi_get_named_property_as_uint16( + env, node_publish_config, AWS_NAPI_KEY_TOPIC_ALIAS, &publish_storage->topic_alias), + { publish_options->topic_alias = &publish_storage->topic_alias; }); + PARSE_OPTIONAL_NAPI_PROPERTY( AWS_NAPI_KEY_RESPONSE_TOPIC, "s_init_publish_options_from_napi", @@ -1677,6 +1696,58 @@ static int s_init_connect_options_from_napi( return AWS_OP_SUCCESS; } +/* Extract topic aliasing configuration from a node object */ +static int s_init_topic_aliasing_options_from_napi( + struct aws_mqtt5_client_binding *binding, + napi_env env, + napi_value node_topic_aliasing_config, + struct aws_mqtt5_client_topic_alias_options *topic_aliasing_options) { + + uint32_t outbound_behavior = 0; + PARSE_OPTIONAL_NAPI_PROPERTY( + AWS_NAPI_KEY_OUTBOUND_BEHAVIOR, + "s_init_topic_aliasing_options_from_napi", + aws_napi_get_named_property_as_uint32( + env, node_topic_aliasing_config, AWS_NAPI_KEY_OUTBOUND_BEHAVIOR, (uint32_t *)&outbound_behavior), + { + topic_aliasing_options->outbound_topic_alias_behavior = + (enum aws_mqtt5_client_outbound_topic_alias_behavior_type)outbound_behavior; + }); + + PARSE_OPTIONAL_NAPI_PROPERTY( + AWS_NAPI_KEY_OUTBOUND_CACHE_MAX_SIZE, + "s_init_topic_aliasing_options_from_napi", + aws_napi_get_named_property_as_uint16( + env, + node_topic_aliasing_config, + AWS_NAPI_KEY_OUTBOUND_CACHE_MAX_SIZE, + &topic_aliasing_options->outbound_alias_cache_max_size), + {}); + + uint32_t inbound_behavior = 0; + PARSE_OPTIONAL_NAPI_PROPERTY( + AWS_NAPI_KEY_INBOUND_BEHAVIOR, + "s_init_topic_aliasing_options_from_napi", + aws_napi_get_named_property_as_uint32( + env, node_topic_aliasing_config, AWS_NAPI_KEY_INBOUND_BEHAVIOR, (uint32_t *)&inbound_behavior), + { + topic_aliasing_options->inbound_topic_alias_behavior = + (enum aws_mqtt5_client_inbound_topic_alias_behavior_type)inbound_behavior; + }); + + PARSE_OPTIONAL_NAPI_PROPERTY( + AWS_NAPI_KEY_INBOUND_CACHE_MAX_SIZE, + "s_init_topic_aliasing_options_from_napi", + aws_napi_get_named_property_as_uint16( + env, + node_topic_aliasing_config, + AWS_NAPI_KEY_INBOUND_CACHE_MAX_SIZE, + &topic_aliasing_options->inbound_alias_cache_size), + {}); + + return AWS_OP_SUCCESS; +} + /* * Persistent storage for mqtt5 client options */ @@ -1814,6 +1885,7 @@ static int s_init_client_configuration_from_js_client_configuration( struct aws_mqtt5_client_options *client_options, struct aws_mqtt5_packet_connect_view *connect_options, struct aws_mqtt5_packet_publish_view *will_options, + struct aws_mqtt5_client_topic_alias_options *topic_aliasing_options, struct aws_napi_mqtt5_client_creation_storage *options_storage) { /* required config parameters */ @@ -1931,6 +2003,25 @@ static int s_init_client_configuration_from_js_client_configuration( } } + napi_value napi_value_topic_aliasing_options = NULL; + if (AWS_NGNPR_VALID_VALUE == aws_napi_get_named_property( + env, + node_client_config, + AWS_NAPI_KEY_TOPIC_ALIASING_OPTIONS, + napi_object, + &napi_value_topic_aliasing_options)) { + if (s_init_topic_aliasing_options_from_napi( + binding, env, napi_value_topic_aliasing_options, topic_aliasing_options)) { + AWS_LOGF_ERROR( + AWS_LS_NODEJS_CRT_GENERAL, + "s_init_client_configuration_from_js_client_configuration - failed to destructure topic aliasing " + "properties"); + return AWS_OP_ERR; + } + + client_options->topic_aliasing_options = topic_aliasing_options; + } + napi_value node_transform_websocket = NULL; if (AWS_NGNPR_VALID_VALUE == aws_napi_get_named_property( env, @@ -2028,6 +2119,9 @@ napi_value aws_napi_mqtt5_client_new(napi_env env, napi_callback_info info) { struct aws_mqtt5_packet_publish_view will_options; AWS_ZERO_STRUCT(will_options); + struct aws_mqtt5_client_topic_alias_options topic_aliasing_options; + AWS_ZERO_STRUCT(topic_aliasing_options); + struct aws_napi_mqtt5_client_creation_storage options_storage; AWS_ZERO_STRUCT(options_storage); @@ -2053,7 +2147,14 @@ napi_value aws_napi_mqtt5_client_new(napi_env env, napi_callback_info info) { } if (s_init_client_configuration_from_js_client_configuration( - env, node_client_config, binding, &client_options, &connect_options, &will_options, &options_storage)) { + env, + node_client_config, + binding, + &client_options, + &connect_options, + &will_options, + &topic_aliasing_options, + &options_storage)) { napi_throw_error( env, NULL, From b294263745ba11ac6386f17be8d7c1ea3197d9ca Mon Sep 17 00:00:00 2001 From: Joseph Klix Date: Wed, 15 Nov 2023 14:49:16 -0800 Subject: [PATCH 4/4] fix stale bot permissions (#504) --- .github/workflows/stale_issue.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/stale_issue.yml b/.github/workflows/stale_issue.yml index 5624eec4b..bdc95a6fd 100644 --- a/.github/workflows/stale_issue.yml +++ b/.github/workflows/stale_issue.yml @@ -9,6 +9,9 @@ jobs: cleanup: runs-on: ubuntu-latest name: Stale issue job + permissions: + issues: write + pull-requests: write steps: - uses: aws-actions/stale-issue-cleanup@v3 with: