Skip to content

Commit

Permalink
Merge pull request #1143 from MarkusFreitag/bugfix-namespaced-filters
Browse files Browse the repository at this point in the history
Bugfix namespaced filters
  • Loading branch information
benjaminhuo authored Apr 25, 2024
2 parents 478b5fd + 4d00ba3 commit 975fea6
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 31 deletions.
38 changes: 8 additions & 30 deletions apis/fluentbit/v1alpha2/filter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package v1alpha2

import (
"bytes"
"crypto/md5"
"fmt"
"reflect"
"sort"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/custom"
"github.com/fluent/fluent-operator/v2/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"sort"
"strings"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -80,32 +78,12 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
if item.Spec.MatchRegex != "" {
buf.WriteString(fmt.Sprintf(" Match_Regex %s\n", utils.GenerateNamespacedMatchRegExpr(item.Namespace, item.Spec.MatchRegex)))
}
for _, filter := range item.Spec.FilterItems {
if filter.Kubernetes != nil {
kubeTagPrefix := filter.Kubernetes.KubeTagPrefix
if kubeTagPrefix == "" {
kubeTagPrefix = "kube.var.log.containers."
}
filter.Kubernetes.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(item.Namespace)), kubeTagPrefix)
if filter.Kubernetes.RegexParser != "" {
filter.Kubernetes.RegexParser = fmt.Sprintf("%s-%x", filter.Kubernetes.RegexParser, md5.Sum([]byte(item.Namespace)))
}
}
if filter.Parser != nil {
parsers := strings.Split(filter.Parser.Parser, ",")
parserString := ""
for i := range parsers {
parsers[i] = strings.Trim(parsers[i], " ")
parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(item.Namespace)))
parserString = parserString + parsers[i] + ","
}
parserString = strings.TrimSuffix(parserString, ",")
filter.Parser.Parser = parserString
}
if filter.CustomPlugin != nil && filter.CustomPlugin.Config != "" {
filter.CustomPlugin.Config = custom.MakeCustomConfigNamespaced(filter.CustomPlugin.Config, item.Namespace)
}

var iface interface{} = p
if f, ok := iface.(plugins.Namespaceable); ok {
f.MakeNamespaced(item.Namespace)
}

kvs, err := p.Params(sl)
if err != nil {
return err
Expand Down
133 changes: 133 additions & 0 deletions apis/fluentbit/v1alpha2/filter_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package v1alpha2

import (
"testing"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/filter"
. "github.com/onsi/gomega"
)

func ptr[T any](v T) *T { return &v }

func TestFilterList_Load(t *testing.T) {
testcases := []struct {
name string
input Filter
expected string
}{
{
name: "a single filteritem",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
},
},
},
expected: `[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
`,
},
{
name: "a single filteritem, with multiple plugins",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Kubernetes: &filter.Kubernetes{
KubeTagPrefix: "custom-tag",
},
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
},
},
},
expected: `[Filter]
Name kubernetes
Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag
[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
`,
},
{
name: "multiple filteritems",
input: Filter{
Spec: FilterSpec{
FilterItems: []FilterItem{
FilterItem{
Kubernetes: &filter.Kubernetes{
KubeTagPrefix: "custom-tag",
},
Parser: &filter.Parser{
KeyName: "log",
Parser: "first-parser",
},
},
FilterItem{
Parser: &filter.Parser{
KeyName: "msg",
Parser: "second-parser",
ReserveData: ptr(true),
},
},
FilterItem{
Parser: &filter.Parser{
KeyName: "msg",
Parser: "third-parser",
ReserveData: ptr(true),
},
},
},
},
},
expected: `[Filter]
Name kubernetes
Kube_Tag_Prefix d41d8cd98f00b204e9800998ecf8427e.custom-tag
[Filter]
Name parser
Key_Name log
Parser first-parser-d41d8cd98f00b204e9800998ecf8427e
[Filter]
Name parser
Key_Name msg
Parser second-parser-d41d8cd98f00b204e9800998ecf8427e
Reserve_Data true
[Filter]
Name parser
Key_Name msg
Parser third-parser-d41d8cd98f00b204e9800998ecf8427e
Reserve_Data true
`,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
g := NewGomegaWithT(t)

sl := plugins.NewSecretLoader(nil, "testnamespace")

fl := FilterList{
Items: make([]Filter, 1),
}
fl.Items[0] = tc.input

renderedFilterList, err := fl.Load(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(renderedFilterList).To(Equal(tc.expected))
})
}
}
9 changes: 8 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/custom/custom_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package custom
import (
"bytes"
"fmt"
"github.com/fluent/fluent-operator/v2/pkg/utils"
"strings"

"github.com/fluent/fluent-operator/v2/pkg/utils"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)
Expand All @@ -28,6 +29,12 @@ func (a *CustomPlugin) Params(_ plugins.SecretLoader) (*params.KVs, error) {
return kvs, nil
}

func (c *CustomPlugin) MakeNamespaced(ns string) {
if c.Config != "" {
c.Config = MakeCustomConfigNamespaced(c.Config, ns)
}
}

func indentation(str string) string {
splits := strings.Split(str, "\n")
var buf bytes.Buffer
Expand Down
11 changes: 11 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"crypto/md5"
"fmt"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
Expand Down Expand Up @@ -189,3 +190,13 @@ func (k *Kubernetes) Params(_ plugins.SecretLoader) (*params.KVs, error) {
}
return kvs, nil
}

func (k *Kubernetes) MakeNamespaced(ns string) {
if k.KubeTagPrefix == "" {
k.KubeTagPrefix = "kube.var.log.containers."
}
k.KubeTagPrefix = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), k.KubeTagPrefix)
if k.RegexParser != "" {
k.RegexParser = fmt.Sprintf("%s-%x", k.RegexParser, md5.Sum([]byte(ns)))
}
}
10 changes: 10 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/filter/parser_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"crypto/md5"
"fmt"
"strings"

Expand Down Expand Up @@ -59,3 +60,12 @@ func (p *Parser) Params(_ plugins.SecretLoader) (*params.KVs, error) {
}
return kvs, nil
}

func (p *Parser) MakeNamespaced(ns string) {
parsers := strings.Split(p.Parser, ",")
for i := range parsers {
parsers[i] = strings.Trim(parsers[i], " ")
parsers[i] = fmt.Sprintf("%s-%x", parsers[i], md5.Sum([]byte(ns)))
}
p.Parser = strings.Join(parsers, ",")
}
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/filter/rewritetag_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package filter

import (
"crypto/md5"
"fmt"
"strings"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)
Expand Down Expand Up @@ -47,3 +51,11 @@ func (r *RewriteTag) Params(_ plugins.SecretLoader) (*params.KVs, error) {
}
return kvs, nil
}

func (r *RewriteTag) MakeNamespaced(ns string) {
for idx, rule := range r.Rules {
parts := strings.Fields(rule)
parts[2] = fmt.Sprintf("%x.%s", md5.Sum([]byte(ns)), parts[2])
r.Rules[idx] = strings.Join(parts, " ")
}
}
6 changes: 6 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,9 @@ type Plugin interface {
Name() string
Params(SecretLoader) (*params.KVs, error)
}

// The Namespaceable interface defines a method for adding a namespace
// to a plugins identifier.
type Namespaceable interface {
MakeNamespaced(string)
}

0 comments on commit 975fea6

Please sign in to comment.