Skip to content

Commit

Permalink
otelcolconvert: support converting kafkareceiver
Browse files Browse the repository at this point in the history
Converting this component is more complicated than other receivers due
to its usage of internal struct types. We work around internal structs
by converting to `map[string]any` when necessary, including a
conversion back to a concrete struct type where it's exposed to us.

Due to the nature of this component using internal types, the test
covers more fields than normal to ensure the mapstructure usage is
correct.

Closes #6425.
  • Loading branch information
rfratto committed Feb 21, 2024
1 parent 77178cf commit 99cd42c
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 0 deletions.
13 changes: 13 additions & 0 deletions converter/internal/otelcolconvert/converter_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/river/token"
"github.com/grafana/river/token/builder"
"github.com/mitchellh/mapstructure"
)

// This file contains shared helpers for converters to use.
Expand Down Expand Up @@ -40,3 +41,15 @@ func toTokenizedConsumers(components []componentID) []otelcol.Consumer {

return res
}

// encodeMapstruct uses mapstruct fields to convert the given argument into a
// map[string]any. This is useful for being able to convert configuration
// sections for OpenTelemetry components where the configuration type is hidden
// in an internal package.
func encodeMapstruct(v any) map[string]any {
var res map[string]any
if err := mapstructure.Decode(v, &res); err != nil {
panic(err)
}
return res
}
195 changes: 195 additions & 0 deletions converter/internal/otelcolconvert/converter_kafkareceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package otelcolconvert

import (
"fmt"

"github.com/davecgh/go-spew/spew"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/receiver/kafka"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
"github.com/grafana/river/rivertypes"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtls"
)

func init() {
converters = append(converters, kafkaReceiverConverter{})
}

type kafkaReceiverConverter struct{}

func (kafkaReceiverConverter) Factory() component.Factory { return kafkareceiver.NewFactory() }

func (kafkaReceiverConverter) InputComponentName() string { return "" }

func (kafkaReceiverConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toKafkaReceiver(state, id, cfg.(*kafkareceiver.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "kafka"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toKafkaReceiver(state *state, id component.InstanceID, cfg *kafkareceiver.Config) *kafka.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &kafka.Arguments{
Brokers: cfg.Brokers,
ProtocolVersion: cfg.ProtocolVersion,
Topic: cfg.Topic,
Encoding: cfg.Encoding,
GroupID: cfg.GroupID,
ClientID: cfg.ClientID,
InitialOffset: cfg.InitialOffset,

Authentication: toKafkaAuthentication(encodeMapstruct(cfg.Authentication)),
Metadata: toKafkaMetadata(cfg.Metadata),
AutoCommit: toKafkaAutoCommit(cfg.AutoCommit),
MessageMarking: toKafkaMessageMarking(cfg.MessageMarking),
HeaderExtraction: toKafkaHeaderExtraction(cfg.HeaderExtraction),

Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
},
}
}

func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments {
spew.Dump(cfg)

return kafka.AuthenticationArguments{
Plaintext: toKafkaPlaintext(encodeMapstruct(cfg["plain_text"])),
SASL: toKafkaSASL(encodeMapstruct(cfg["sasl"])),
TLS: toKafkaTLSClientArguments(encodeMapstruct(cfg["tls"])),
Kerberos: toKafkaKerberos(encodeMapstruct(cfg["kerberos"])),
}
}

func toKafkaPlaintext(cfg map[string]any) *kafka.PlaintextArguments {
if cfg == nil {
return nil
}

return &kafka.PlaintextArguments{
Username: cfg["username"].(string),
Password: rivertypes.Secret(cfg["password"].(string)),
}
}

func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments {
if cfg == nil {
return nil
}

return &kafka.SASLArguments{
Username: cfg["username"].(string),
Password: rivertypes.Secret(cfg["password"].(string)),
Mechanism: cfg["mechanism"].(string),
Version: cfg["version"].(int),
AWSMSK: toKafkaAWSMSK(encodeMapstruct(cfg["aws_msk"])),
}
}

func toKafkaAWSMSK(cfg map[string]any) kafka.AWSMSKArguments {
if cfg == nil {
return kafka.AWSMSKArguments{}
}

return kafka.AWSMSKArguments{
Region: cfg["region"].(string),
BrokerAddr: cfg["broker_addr"].(string),
}
}

func toKafkaTLSClientArguments(cfg map[string]any) *otelcol.TLSClientArguments {
if cfg == nil {
return nil
}

// Convert cfg to configtls.TLSClientSetting
var tlsSettings configtls.TLSClientSetting
if err := mapstructure.Decode(cfg, &tlsSettings); err != nil {
panic(err)
}

res := toTLSClientArguments(tlsSettings)
return &res
}

func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments {
if cfg == nil {
return nil
}

return &kafka.KerberosArguments{
ServiceName: cfg["service_name"].(string),
Realm: cfg["realm"].(string),
UseKeyTab: cfg["use_keytab"].(bool),
Username: cfg["username"].(string),
Password: rivertypes.Secret(cfg["password"].(string)),
ConfigPath: cfg["config_file"].(string),
KeyTabPath: cfg["keytab_file"].(string),
}
}

func toKafkaMetadata(cfg kafkaexporter.Metadata) kafka.MetadataArguments {
return kafka.MetadataArguments{
IncludeAllTopics: cfg.Full,
Retry: toKafkaRetry(cfg.Retry),
}
}

func toKafkaRetry(cfg kafkaexporter.MetadataRetry) kafka.MetadataRetryArguments {
return kafka.MetadataRetryArguments{
MaxRetries: cfg.Max,
Backoff: cfg.Backoff,
}
}

func toKafkaAutoCommit(cfg kafkareceiver.AutoCommit) kafka.AutoCommitArguments {
return kafka.AutoCommitArguments{
Enable: cfg.Enable,
Interval: cfg.Interval,
}
}

func toKafkaMessageMarking(cfg kafkareceiver.MessageMarking) kafka.MessageMarkingArguments {
return kafka.MessageMarkingArguments{
AfterExecution: cfg.After,
IncludeUnsuccessful: cfg.OnError,
}
}

func toKafkaHeaderExtraction(cfg kafkareceiver.HeaderExtraction) kafka.HeaderExtraction {
// If cfg.Headers is nil, we set it to an empty slice to align with
// the deafult of the Flow component; if this isn't done than default headers
// will be explicitly set as `[]` in the genered Flow configuration file, which
// may confuse users.
if cfg.Headers == nil {
cfg.Headers = []string{}
}

return kafka.HeaderExtraction{
ExtractHeaders: cfg.ExtractHeaders,
Headers: cfg.Headers,
}
}
46 changes: 46 additions & 0 deletions converter/internal/otelcolconvert/testdata/kafka.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
otelcol.receiver.kafka "default" {
brokers = ["broker:9092"]
protocol_version = "2.0.0"

authentication {
plaintext {
username = "fakeusername"
password = "fakepassword"
}

sasl {
username = "fakeusername"
password = "fakepassword"
mechanism = "somemechanism"
version = 5

aws_msk {
region = "us-east-1"
broker_addr = "broker:9092"
}
}

tls {
insecure = true
}

kerberos {
service_name = "someservice"
realm = "myrealm"
username = "fakeusername"
password = "fakepassword"
}
}

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}
48 changes: 48 additions & 0 deletions converter/internal/otelcolconvert/testdata/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
receivers:
kafka:
brokers: ['broker:9092']
protocol_version: 2.0.0
auth:
plain_text:
username: fakeusername
password: fakepassword
sasl:
username: fakeusername
password: fakepassword
mechanism: somemechanism
version: 5
aws_msk:
region: us-east-1
broker_addr: broker:9092
tls:
insecure: true
kerberos:
username: fakeusername
password: fakepassword
service_name: someservice
realm: myrealm


exporters:
otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

service:
pipelines:
metrics:
receivers: [kafka]
processors: []
exporters: [otlp]
logs:
receivers: [kafka]
processors: []
exporters: [otlp]
traces:
receivers: [kafka]
processors: []
exporters: [otlp]

0 comments on commit 99cd42c

Please sign in to comment.