From 76bfd37895381e56ce31f7c5a366b71709810503 Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Fri, 29 Nov 2024 07:58:22 -0800 Subject: [PATCH 1/9] Adding support for AWS IAM authentication for MSK --- lib/fluent/plugin/out_rdkafka2.rb | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index b306cfb..2038d90 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -101,6 +101,8 @@ class Fluent::Rdkafka2Output < Output config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], :desc => 'Handle some of the error codes should be unrecoverable if specified' + config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK' + config_section :buffer do config_set_default :chunk_keys, ["topic"] end @@ -205,10 +207,17 @@ def add(level, message = nil) end end } + # HERE ----------------- Rdkafka::Config.logger = log config = build_config @rdkafka = Rdkafka::Config.new(config) + + if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" + Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) + end + # HERE ----------------- + if @default_topic.nil? if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" @@ -289,6 +298,7 @@ def build_config config[:"sasl.password"] = @password if @password config[:"enable.idempotence"] = @idempotent if @idempotent + # sasl.mechnisms and security.protocol are set as rdkafka_options @rdkafka_options.each { |k, v| config[k.to_sym] = v } @@ -296,6 +306,26 @@ def build_config config end + def refresh_token(_config, _client_name) + print "refreshing token\n" + client = get_producer + signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) + token = signer.generate_auth_token + + if token + client.oauthbearer_set_token( + token: token.token, + lifetime_ms: token.expiration_time_ms, + principal_name: "kafka-cluster" + ) + else + client.oauthbearer_set_token_failure( + "Failed to generate token." + ) + end + end + + # HERE ----------------- def start if @share_producer @shared_producer = @rdkafka.producer @@ -306,6 +336,7 @@ def start super end + # HERE ----------------- def multi_workers_ready? true From 4ce442ab0ee37d32a44bb399c8a8e9ac72698727 Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Fri, 29 Nov 2024 08:34:30 -0800 Subject: [PATCH 2/9] Add required gems for MSK IAM support --- Gemfile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index db01f5a..25b1013 100644 --- a/Gemfile +++ b/Gemfile @@ -3,4 +3,6 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec -gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] +gem "json", "2.7.3" # locked to 2.7.3 for compatibility with AWS SDK code +gem "aws-msk-iam-sasl-signer" +gem 'rdkafka' From 8c7ac805cd1d6e2a83aefbf60d10221e96fa1efd Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Fri, 29 Nov 2024 10:19:38 -0800 Subject: [PATCH 3/9] Move dependencies to gemspec file --- Gemfile | 4 +--- fluent-plugin-kafka.gemspec | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Gemfile b/Gemfile index 25b1013..db01f5a 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,4 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec -gem "json", "2.7.3" # locked to 2.7.3 for compatibility with AWS SDK code -gem "aws-msk-iam-sasl-signer" -gem 'rdkafka' +gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index 3620ddb..ea118bb 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -19,6 +19,9 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' + gem.add_dependency 'rdkafka' + gem.add_dependency 'aws-msk-iam-sasl-signer' + gem.add_dependency 'json', '2.7.3' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" From ed87e4866bf41178a1574bc4cc979e12c679f421 Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Mon, 2 Dec 2024 11:23:39 -0800 Subject: [PATCH 4/9] Fix issues revealed in test setup; Add fork info to README --- Gemfile | 1 + README.md | 41 ++++++++++++++++++++++++++++++- fluent-plugin-kafka.gemspec | 1 - lib/fluent/plugin/out_rdkafka2.rb | 16 +++++++++--- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/Gemfile b/Gemfile index db01f5a..0e3cf2b 100644 --- a/Gemfile +++ b/Gemfile @@ -3,4 +3,5 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec +gem 'json', '2.7.3' # override of 2.7.4 version gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] diff --git a/README.md b/README.md index 814d40e..9147728 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,45 @@ If you want to use zookeeper related parameters, you also need to install zookee ## Usage +### :exclamation: In this fork: MSK IAM Authentication Support for `rdkafka2` Output Type + +This fork adds support for using MSK IAM authentication with the `rdkafka2` output type in Fluentd. Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library. + +The `aws-msk-iam-sasl-signer-ruby` library provides an [example](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby/tree/main/examples/rdkafka) for generating the OAuthBearer token using rdkafka, which is one of the core Kafka libraries supported by the Fluentd fluent-plugin-kafka plugin. This fork integrates that example into the `Fluent::Rdkafka2Output` class, enabling AWS IAM authentication. + +The key change is the inclusion of a refresh callback: +```ruby +Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) +``` +This callback triggers token generation when needed, ensuring continuous authentication with the MSK cluster. + +#### Configuration Example +To enable this feature, configure your Fluentd input as follows: + +``` + + @type rdkafka2 + # Kafka brokers to connect to (typically port 9098 or 9198 for IAM authentication) + brokers + # Topic to write events to + topic_key test-topic-1 + default_topic test-topic-1 + + # AWS Region (required) + aws_msk_region us-east-1 + + # Use a shared producer for the connection (required) + share_producer true + + # MSK IAM authentication settings (required) + rdkafka_options { + "security.protocol": "sasl_ssl", + "sasl.mechanisms": "OAUTHBEARER" + } + +``` +With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication. + ### Common parameters #### SSL authentication @@ -563,7 +602,7 @@ You need to install rdkafka gem. `rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter: -- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. +- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. If you use v0.12, use `rdkafka` instead. diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index ea118bb..ad57ea9 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -21,7 +21,6 @@ Gem::Specification.new do |gem| gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' gem.add_dependency 'rdkafka' gem.add_dependency 'aws-msk-iam-sasl-signer' - gem.add_dependency 'json', '2.7.3' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 2038d90..c76e3f5 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -4,6 +4,7 @@ require 'fluent/plugin/kafka_plugin_util' require 'rdkafka' +require 'aws_msk_iam_sasl_signer' begin rdkafka_version = Gem::Version::create(Rdkafka::VERSION) @@ -307,8 +308,14 @@ def build_config end def refresh_token(_config, _client_name) - print "refreshing token\n" + log.info("+--- Refreshing token") client = get_producer + # This will happen once upon initialization and is expected to fail, as the producer isnt set yet + # We will set the token manually after creation and after that this refresh method will work + unless client + log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)") + return + end signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) token = signer.generate_auth_token @@ -325,10 +332,14 @@ def refresh_token(_config, _client_name) end end - # HERE ----------------- def start if @share_producer @shared_producer = @rdkafka.producer + log.info("Created shared producer") + if @aws_msk_region + refresh_token(nil, nil) + log.info("Set initial token for shared producer") + end else @producers = {} @producers_mutex = Mutex.new @@ -336,7 +347,6 @@ def start super end - # HERE ----------------- def multi_workers_ready? true From 23edf2227fa135c0511a96b981fbbfdef4f48f11 Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Sat, 7 Dec 2024 14:44:29 -0800 Subject: [PATCH 5/9] Adding gems conditionally --- Gemfile | 11 +++++++++-- fluent-plugin-kafka.gemspec | 2 -- lib/fluent/plugin/out_rdkafka2.rb | 4 +--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Gemfile b/Gemfile index 0e3cf2b..45898ca 100644 --- a/Gemfile +++ b/Gemfile @@ -3,5 +3,12 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec -gem 'json', '2.7.3' # override of 2.7.4 version -gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] +if ENV['USE_RDKAFKA'] + gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] + min_version = Gem::Version.new('0.16.0') + min_range_version = Gem::Version.new(ENV['RDKAFKA_VERSION_MIN_RANGE'].split(' ').last) + if min_range_version >= min_version + gem 'aws-msk-iam-sasl-signer' + gem 'json', '2.7.3' # override of 2.7.4 version + end +end diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index ad57ea9..3620ddb 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -19,8 +19,6 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' - gem.add_dependency 'rdkafka' - gem.add_dependency 'aws-msk-iam-sasl-signer' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index c76e3f5..9a09ce0 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -4,7 +4,6 @@ require 'fluent/plugin/kafka_plugin_util' require 'rdkafka' -require 'aws_msk_iam_sasl_signer' begin rdkafka_version = Gem::Version::create(Rdkafka::VERSION) @@ -16,6 +15,7 @@ require_relative 'rdkafka_patch/0_14_0' elsif rdkafka_version >= Gem::Version.create('0.16.0') require_relative 'rdkafka_patch/0_16_0' + require 'aws_msk_iam_sasl_signer' end rescue LoadError, NameError raise "unable to patch rdkafka." @@ -208,7 +208,6 @@ def add(level, message = nil) end end } - # HERE ----------------- Rdkafka::Config.logger = log config = build_config @rdkafka = Rdkafka::Config.new(config) @@ -217,7 +216,6 @@ def add(level, message = nil) if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) end - # HERE ----------------- if @default_topic.nil? if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error From 3d63b3fdf3e91cbd3b5e4d5e357f7d4c928a1bbf Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Sat, 7 Dec 2024 19:16:10 -0800 Subject: [PATCH 6/9] Moving required gems into gemspec --- Gemfile | 10 ---------- fluent-plugin-kafka.gemspec | 6 ++++++ lib/fluent/plugin/out_rdkafka2.rb | 2 +- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/Gemfile b/Gemfile index 45898ca..d5c272e 100644 --- a/Gemfile +++ b/Gemfile @@ -2,13 +2,3 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec - -if ENV['USE_RDKAFKA'] - gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] - min_version = Gem::Version.new('0.16.0') - min_range_version = Gem::Version.new(ENV['RDKAFKA_VERSION_MIN_RANGE'].split(' ').last) - if min_range_version >= min_version - gem 'aws-msk-iam-sasl-signer' - gem 'json', '2.7.3' # override of 2.7.4 version - end -end diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index 3620ddb..d6d3371 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -19,6 +19,12 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' + + if ENV['USE_RDKAFKA'] + gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']] + gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' + end + gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 9a09ce0..f3bfbdd 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -15,8 +15,8 @@ require_relative 'rdkafka_patch/0_14_0' elsif rdkafka_version >= Gem::Version.create('0.16.0') require_relative 'rdkafka_patch/0_16_0' - require 'aws_msk_iam_sasl_signer' end + require 'aws_msk_iam_sasl_signer' if rdkafka_version >= Gem::Version.create('0.16.0') rescue LoadError, NameError raise "unable to patch rdkafka." end From a0fac67e364d57520c80ffd4b78a0eef05bbe99a Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Sat, 7 Dec 2024 19:31:31 -0800 Subject: [PATCH 7/9] Make aws signer lib dependent on min ruby version --- fluent-plugin-kafka.gemspec | 4 +++- lib/fluent/plugin/out_rdkafka2.rb | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index d6d3371..c7700a4 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -22,7 +22,9 @@ Gem::Specification.new do |gem| if ENV['USE_RDKAFKA'] gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']] - gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' + if Gem::Version.new('3.0' >= Gem::Version.new(RUBY_VERSION) + gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' + end end gem.add_development_dependency "rake", ">= 0.9.2" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index f3bfbdd..6301e19 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -16,11 +16,14 @@ elsif rdkafka_version >= Gem::Version.create('0.16.0') require_relative 'rdkafka_patch/0_16_0' end - require 'aws_msk_iam_sasl_signer' if rdkafka_version >= Gem::Version.create('0.16.0') rescue LoadError, NameError raise "unable to patch rdkafka." end +if Gem::Version.create(RUBY_VERSION) >= Gem::Version.create('3.0') + require 'aws-msk-iam-sasl-signer' +end + module Fluent::Plugin class Fluent::Rdkafka2Output < Output Fluent::Plugin.register_output('rdkafka2', self) From fef89b849602df1e4952c7a21ac3c0882fa36ef4 Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Sun, 8 Dec 2024 14:11:43 -0800 Subject: [PATCH 8/9] Fix typo in fluent-plugin-kafka.gemspec --- fluent-plugin-kafka.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index c7700a4..db3381f 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |gem| if ENV['USE_RDKAFKA'] gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']] - if Gem::Version.new('3.0' >= Gem::Version.new(RUBY_VERSION) + if Gem::Version.new('3.0') >= Gem::Version.new(RUBY_VERSION) gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' end end From d3d5845623040539eee9cd25e62a78e12b065897 Mon Sep 17 00:00:00 2001 From: Andrea Singh Date: Sun, 8 Dec 2024 18:08:12 -0800 Subject: [PATCH 9/9] Fix conditional in gemspec --- fluent-plugin-kafka.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index db3381f..ceace4f 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |gem| if ENV['USE_RDKAFKA'] gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']] - if Gem::Version.new('3.0') >= Gem::Version.new(RUBY_VERSION) + if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0') gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' end end