From 99cd42c57eb3279a520ec6e890d90d76725bcc88 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Wed, 21 Feb 2024 12:45:52 -0500 Subject: [PATCH] otelcolconvert: support converting kafkareceiver Converting this component is more complicated than other receivers due to its usage of internal struct types. We work around internal structs by converting to `map[string]any` when necessary, including a conversion back to a concrete struct type where it's exposed to us. Due to the nature of this component using internal types, the test covers more fields than normal to ensure the mapstructure usage is correct. Closes #6425. --- .../otelcolconvert/converter_helpers.go | 13 ++ .../otelcolconvert/converter_kafkareceiver.go | 195 ++++++++++++++++++ .../otelcolconvert/testdata/kafka.river | 46 +++++ .../otelcolconvert/testdata/kafka.yaml | 48 +++++ 4 files changed, 302 insertions(+) create mode 100644 converter/internal/otelcolconvert/converter_kafkareceiver.go create mode 100644 converter/internal/otelcolconvert/testdata/kafka.river create mode 100644 converter/internal/otelcolconvert/testdata/kafka.yaml diff --git a/converter/internal/otelcolconvert/converter_helpers.go b/converter/internal/otelcolconvert/converter_helpers.go index 9a74c930995c..8c9ebf2d5be1 100644 --- a/converter/internal/otelcolconvert/converter_helpers.go +++ b/converter/internal/otelcolconvert/converter_helpers.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/agent/component/otelcol" "github.com/grafana/river/token" "github.com/grafana/river/token/builder" + "github.com/mitchellh/mapstructure" ) // This file contains shared helpers for converters to use. @@ -40,3 +41,15 @@ func toTokenizedConsumers(components []componentID) []otelcol.Consumer { return res } + +// encodeMapstruct uses mapstruct fields to convert the given argument into a +// map[string]any. This is useful for being able to convert configuration +// sections for OpenTelemetry components where the configuration type is hidden +// in an internal package. +func encodeMapstruct(v any) map[string]any { + var res map[string]any + if err := mapstructure.Decode(v, &res); err != nil { + panic(err) + } + return res +} diff --git a/converter/internal/otelcolconvert/converter_kafkareceiver.go b/converter/internal/otelcolconvert/converter_kafkareceiver.go new file mode 100644 index 000000000000..e11ac6ef56e4 --- /dev/null +++ b/converter/internal/otelcolconvert/converter_kafkareceiver.go @@ -0,0 +1,195 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/davecgh/go-spew/spew" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/receiver/kafka" + "github.com/grafana/agent/converter/diag" + "github.com/grafana/agent/converter/internal/common" + "github.com/grafana/river/rivertypes" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtls" +) + +func init() { + converters = append(converters, kafkaReceiverConverter{}) +} + +type kafkaReceiverConverter struct{} + +func (kafkaReceiverConverter) Factory() component.Factory { return kafkareceiver.NewFactory() } + +func (kafkaReceiverConverter) InputComponentName() string { return "" } + +func (kafkaReceiverConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.FlowComponentLabel() + + args := toKafkaReceiver(state, id, cfg.(*kafkareceiver.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "kafka"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toKafkaReceiver(state *state, id component.InstanceID, cfg *kafkareceiver.Config) *kafka.Arguments { + var ( + nextMetrics = state.Next(id, component.DataTypeMetrics) + nextLogs = state.Next(id, component.DataTypeLogs) + nextTraces = state.Next(id, component.DataTypeTraces) + ) + + return &kafka.Arguments{ + Brokers: cfg.Brokers, + ProtocolVersion: cfg.ProtocolVersion, + Topic: cfg.Topic, + Encoding: cfg.Encoding, + GroupID: cfg.GroupID, + ClientID: cfg.ClientID, + InitialOffset: cfg.InitialOffset, + + Authentication: toKafkaAuthentication(encodeMapstruct(cfg.Authentication)), + Metadata: toKafkaMetadata(cfg.Metadata), + AutoCommit: toKafkaAutoCommit(cfg.AutoCommit), + MessageMarking: toKafkaMessageMarking(cfg.MessageMarking), + HeaderExtraction: toKafkaHeaderExtraction(cfg.HeaderExtraction), + + Output: &otelcol.ConsumerArguments{ + Metrics: toTokenizedConsumers(nextMetrics), + Logs: toTokenizedConsumers(nextLogs), + Traces: toTokenizedConsumers(nextTraces), + }, + } +} + +func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { + spew.Dump(cfg) + + return kafka.AuthenticationArguments{ + Plaintext: toKafkaPlaintext(encodeMapstruct(cfg["plain_text"])), + SASL: toKafkaSASL(encodeMapstruct(cfg["sasl"])), + TLS: toKafkaTLSClientArguments(encodeMapstruct(cfg["tls"])), + Kerberos: toKafkaKerberos(encodeMapstruct(cfg["kerberos"])), + } +} + +func toKafkaPlaintext(cfg map[string]any) *kafka.PlaintextArguments { + if cfg == nil { + return nil + } + + return &kafka.PlaintextArguments{ + Username: cfg["username"].(string), + Password: rivertypes.Secret(cfg["password"].(string)), + } +} + +func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { + if cfg == nil { + return nil + } + + return &kafka.SASLArguments{ + Username: cfg["username"].(string), + Password: rivertypes.Secret(cfg["password"].(string)), + Mechanism: cfg["mechanism"].(string), + Version: cfg["version"].(int), + AWSMSK: toKafkaAWSMSK(encodeMapstruct(cfg["aws_msk"])), + } +} + +func toKafkaAWSMSK(cfg map[string]any) kafka.AWSMSKArguments { + if cfg == nil { + return kafka.AWSMSKArguments{} + } + + return kafka.AWSMSKArguments{ + Region: cfg["region"].(string), + BrokerAddr: cfg["broker_addr"].(string), + } +} + +func toKafkaTLSClientArguments(cfg map[string]any) *otelcol.TLSClientArguments { + if cfg == nil { + return nil + } + + // Convert cfg to configtls.TLSClientSetting + var tlsSettings configtls.TLSClientSetting + if err := mapstructure.Decode(cfg, &tlsSettings); err != nil { + panic(err) + } + + res := toTLSClientArguments(tlsSettings) + return &res +} + +func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { + if cfg == nil { + return nil + } + + return &kafka.KerberosArguments{ + ServiceName: cfg["service_name"].(string), + Realm: cfg["realm"].(string), + UseKeyTab: cfg["use_keytab"].(bool), + Username: cfg["username"].(string), + Password: rivertypes.Secret(cfg["password"].(string)), + ConfigPath: cfg["config_file"].(string), + KeyTabPath: cfg["keytab_file"].(string), + } +} + +func toKafkaMetadata(cfg kafkaexporter.Metadata) kafka.MetadataArguments { + return kafka.MetadataArguments{ + IncludeAllTopics: cfg.Full, + Retry: toKafkaRetry(cfg.Retry), + } +} + +func toKafkaRetry(cfg kafkaexporter.MetadataRetry) kafka.MetadataRetryArguments { + return kafka.MetadataRetryArguments{ + MaxRetries: cfg.Max, + Backoff: cfg.Backoff, + } +} + +func toKafkaAutoCommit(cfg kafkareceiver.AutoCommit) kafka.AutoCommitArguments { + return kafka.AutoCommitArguments{ + Enable: cfg.Enable, + Interval: cfg.Interval, + } +} + +func toKafkaMessageMarking(cfg kafkareceiver.MessageMarking) kafka.MessageMarkingArguments { + return kafka.MessageMarkingArguments{ + AfterExecution: cfg.After, + IncludeUnsuccessful: cfg.OnError, + } +} + +func toKafkaHeaderExtraction(cfg kafkareceiver.HeaderExtraction) kafka.HeaderExtraction { + // If cfg.Headers is nil, we set it to an empty slice to align with + // the deafult of the Flow component; if this isn't done than default headers + // will be explicitly set as `[]` in the genered Flow configuration file, which + // may confuse users. + if cfg.Headers == nil { + cfg.Headers = []string{} + } + + return kafka.HeaderExtraction{ + ExtractHeaders: cfg.ExtractHeaders, + Headers: cfg.Headers, + } +} diff --git a/converter/internal/otelcolconvert/testdata/kafka.river b/converter/internal/otelcolconvert/testdata/kafka.river new file mode 100644 index 000000000000..b98eabaf2f76 --- /dev/null +++ b/converter/internal/otelcolconvert/testdata/kafka.river @@ -0,0 +1,46 @@ +otelcol.receiver.kafka "default" { + brokers = ["broker:9092"] + protocol_version = "2.0.0" + + authentication { + plaintext { + username = "fakeusername" + password = "fakepassword" + } + + sasl { + username = "fakeusername" + password = "fakepassword" + mechanism = "somemechanism" + version = 5 + + aws_msk { + region = "us-east-1" + broker_addr = "broker:9092" + } + } + + tls { + insecure = true + } + + kerberos { + service_name = "someservice" + realm = "myrealm" + username = "fakeusername" + password = "fakepassword" + } + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/converter/internal/otelcolconvert/testdata/kafka.yaml b/converter/internal/otelcolconvert/testdata/kafka.yaml new file mode 100644 index 000000000000..456c87a007e9 --- /dev/null +++ b/converter/internal/otelcolconvert/testdata/kafka.yaml @@ -0,0 +1,48 @@ +receivers: + kafka: + brokers: ['broker:9092'] + protocol_version: 2.0.0 + auth: + plain_text: + username: fakeusername + password: fakepassword + sasl: + username: fakeusername + password: fakepassword + mechanism: somemechanism + version: 5 + aws_msk: + region: us-east-1 + broker_addr: broker:9092 + tls: + insecure: true + kerberos: + username: fakeusername + password: fakepassword + service_name: someservice + realm: myrealm + + +exporters: + otlp: + # Our defaults have drifted from upstream, so we explicitly set our + # defaults below (balancer_name and queue_size). + endpoint: database:4317 + balancer_name: pick_first + sending_queue: + queue_size: 5000 + +service: + pipelines: + metrics: + receivers: [kafka] + processors: [] + exporters: [otlp] + logs: + receivers: [kafka] + processors: [] + exporters: [otlp] + traces: + receivers: [kafka] + processors: [] + exporters: [otlp]