Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed the kafka_types.go file and introduced a mapvalue struct. #1328

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions apis/fluentbit/v1alpha2/plugins/output/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,27 @@

// +kubebuilder:object:generate:=true

type mapvalue struct {
StringVal string `json:"format,omitempty"`
Secret *plugins.Secret `json:"format,omitempty"`

Check failure on line 14 in apis/fluentbit/v1alpha2/plugins/output/kafka_types.go

View workflow job for this annotation

GitHub Actions / Basic test and verify

struct field Secret repeats json tag "format" also at kafka_types.go:13
}

// Implement the GetStringVal method to satisfy the SecretProvider interface
func (m mapvalue) GetStringVal() string {
return m.StringVal
}

// mapvalue implicitly implements params.SecretProvider because SecretProvider is an empty interface
var _ params.SecretProvider = mapvalue{}

func convertMap(input map[string]mapvalue) map[string]params.SecretProvider {
result := make(map[string]params.SecretProvider)
for k, v := range input {
result[k] = v
}
return result
}

// Kafka output plugin allows to ingest your records into an Apache Kafka service. <br />
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka**
type Kafka struct {
Expand All @@ -35,8 +56,8 @@
// then by default the first topic in the Topics list will indicate the topic to be used.
TopicKey string `json:"topicKey,omitempty"`
// {property} can be any librdkafka properties
Rdkafka map[string]string `json:"rdkafka,omitempty"`
//adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured
Rdkafka map[string]mapvalue `json:"rdkafka,omitempty"`
DynamicTopic *bool `json:"dynamicTopic,omitempty"`
//Fluent Bit queues data into rdkafka library,
//if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records.
Expand Down Expand Up @@ -84,8 +105,8 @@
kvs.Insert("queue_full_retries", fmt.Sprint(*k.QueueFullRetries))
}

kvs.InsertStringMap(k.Rdkafka, func(k, v string) (string, string) {
return fmt.Sprintf("rdkafka.%s", k), v
kvs.InsertMapValMap(convertMap(k.Rdkafka), func(k, v string) (string, params.SecretProvider) {
return fmt.Sprintf("rdkafka.%s", k), mapvalue{StringVal: v}
})

return kvs, nil
Expand Down
32 changes: 0 additions & 32 deletions apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/params/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (

type kvTransformFunc func(string, string) (string, string)

type SecretProvider interface {
GetStringVal() string
}

type kvTransformFunc1 func(string, string) (string, SecretProvider)

type KVs struct {
keys []string
values []string
Expand All @@ -25,6 +31,36 @@ func NewKVs() *KVs {
}
}

func (kvs *KVs) InsertMapValMap(m map[string]SecretProvider, f kvTransformFunc1) {
if len(m) > 0 {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}

sort.Strings(keys)

for _, k := range keys {
v := m[k]
strval := v.GetStringVal()
if f != nil {
transformedKey, transformedVal := f(k, strval)

if transformedVal != nil {
strval = transformedVal.GetStringVal()
} else {
strval = "" // Default to an empty string if transformation returns nil
}

k = transformedKey
}

kvs.Insert(k, strval)

}
}
}

func (kvs *KVs) Insert(key, value string) {
kvs.keys = append(kvs.keys, key)
kvs.values = append(kvs.values, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
42 changes: 40 additions & 2 deletions config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
42 changes: 40 additions & 2 deletions config/crd/bases/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
Loading
Loading