diff --git a/x-pack/filebeat/input/netflow/decoder/config/config.go b/x-pack/filebeat/input/netflow/decoder/config/config.go index 5297f3c4a31e..8b51336596ea 100644 --- a/x-pack/filebeat/input/netflow/decoder/config/config.go +++ b/x-pack/filebeat/input/netflow/decoder/config/config.go @@ -5,10 +5,10 @@ package config import ( - "io" "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" + "github.com/elastic/elastic-agent-libs/logp" ) type ActiveSessionsMetric interface { @@ -19,7 +19,7 @@ type ActiveSessionsMetric interface { // Config stores the configuration used by the NetFlow Collector. type Config struct { protocols []string - logOutput io.Writer + logOutput *logp.Logger expiration time.Duration detectReset bool fields fields.FieldDict @@ -28,21 +28,22 @@ type Config struct { activeSessionsMetric ActiveSessionsMetric } -var defaultCfg = Config{ - protocols: []string{}, - logOutput: io.Discard, - expiration: time.Hour, - detectReset: true, - sharedTemplates: false, - withCache: false, -} - // Defaults returns a configuration object with defaults settings: // - no protocols are enabled. -// - log output is discarded +// - log output is set to the logger that is passed in. // - session expiration is checked once every hour. -func Defaults() Config { - return defaultCfg +// - resets are detected. +// - templates are not shared. +// - cache is disabled. +func Defaults(logger *logp.Logger) Config { + return Config{ + protocols: []string{}, + logOutput: logger, + expiration: time.Hour, + detectReset: true, + sharedTemplates: false, + withCache: false, + } } // WithProtocols modifies an existing configuration object to enable the @@ -52,12 +53,6 @@ func (c *Config) WithProtocols(protos ...string) *Config { return c } -// WithLogOutput sets the output io.Writer for logging. -func (c *Config) WithLogOutput(output io.Writer) *Config { - c.logOutput = output - return c -} - // WithExpiration configures the expiration timeout for sessions and templates. // A value of zero disables expiration. func (c *Config) WithExpiration(timeout time.Duration) *Config { @@ -121,7 +116,7 @@ func (c *Config) Protocols() []string { } // LogOutput returns the io.Writer where logs are to be written. -func (c *Config) LogOutput() io.Writer { +func (c *Config) LogOutput() *logp.Logger { return c.logOutput } diff --git a/x-pack/filebeat/input/netflow/decoder/decoder.go b/x-pack/filebeat/input/netflow/decoder/decoder.go index 0e1ef3df92cc..37b00e5b3565 100644 --- a/x-pack/filebeat/input/netflow/decoder/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/decoder.go @@ -14,6 +14,8 @@ import ( "net" "sync" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" @@ -93,8 +95,8 @@ func (p *Decoder) Read(buf *bytes.Buffer, source net.Addr) (records []record.Rec } // NewConfig returns a new configuration structure to be passed to NewDecoder. -func NewConfig() *config.Config { - cfg := config.Defaults() +func NewConfig(logger *logp.Logger) *config.Config { + cfg := config.Defaults(logger) return &cfg } diff --git a/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go b/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go index c86e97e5d460..2d88eb0bebfc 100644 --- a/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go +++ b/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go @@ -8,43 +8,44 @@ import ( "bytes" "encoding/json" "fmt" - "log" "net" - "os" + + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" ) func main() { - decoder, err := decoder.NewDecoder(decoder.NewConfig(). - WithLogOutput(os.Stderr). + logger := logp.L().Named("netflow") + + decoder, err := decoder.NewDecoder(decoder.NewConfig(logger). WithProtocols("v1", "v5", "v9", "ipfix")) if err != nil { - log.Fatal("Failed creating decoder:", err) + logger.Fatal("Failed creating decoder:", err) } addr, err := net.ResolveUDPAddr("udp", ":2055") if err != nil { - log.Fatal("Failed to resolve address:", err) + logger.Fatal("Failed to resolve address:", err) } server, err := net.ListenUDP("udp", addr) if err != nil { - log.Fatalf("Failed to listen on %v: %v", addr, err) + logger.Fatalf("Failed to listen on %v: %v", addr, err) } defer server.Close() if err = server.SetReadBuffer(1 << 16); err != nil { - log.Fatalf("Failed to set read buffer size for socket: %v", err) + logger.Fatalf("Failed to set read buffer size for socket: %v", err) } - log.Println("Listening on ", server.LocalAddr()) + logger.Debug("Listening on ", server.LocalAddr()) buf := make([]byte, 8192) decBuf := new(bytes.Buffer) for { size, remote, err := server.ReadFromUDP(buf) if err != nil { - log.Println("Error reading from socket:", err) + logger.Debug("Error reading from socket:", err) continue } @@ -52,7 +53,7 @@ func main() { decBuf.Write(buf[:size]) records, err := decoder.Read(decBuf, remote) if err != nil { - log.Printf("warn: Failed reading records from %v: %v\n", remote, err) + logger.Debugf("warn: Failed reading records from %v: %v\n", remote, err) } for _, r := range records { @@ -63,7 +64,7 @@ func main() { "data": r.Fields, }) if err != nil { - log.Fatal(err) + logger.Fatal(err) } fmt.Println(string(evt)) } diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go index b8799c2d3919..c3f3b0669b81 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go @@ -5,8 +5,6 @@ package ipfix import ( - "log" - "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9" @@ -29,7 +27,7 @@ func init() { } func New(config config.Config) protocol.Protocol { - logger := log.New(config.LogOutput(), LogPrefix, 0) + logger := config.LogOutput().Named(LogPrefix) decoder := DecoderIPFIX{ DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()}, } diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go index afed80638c1e..d4b0445d419b 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" @@ -19,6 +21,10 @@ import ( v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9" ) +func init() { + logp.TestingSetup() +} + func TestMessageWithOptions(t *testing.T) { rawString := "" + "000a01e45bf435e1000000a500000000000200480400001000080004000c0004" + @@ -67,7 +73,7 @@ func TestMessageWithOptions(t *testing.T) { "version": uint64(10), }, } - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) flows, err := proto.OnPacket(bytes.NewBuffer(raw), test.MakeAddress(t, "127.0.0.1:1234")) assert.NoError(t, err) if assert.Len(t, flows, 7) { @@ -84,7 +90,7 @@ func TestOptionTemplates(t *testing.T) { key := v9.MakeSessionKey(addr, 1234, false) t.Run("Single options template", func(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) flows, err := proto.OnPacket(test.MakePacket([]uint16{ // Header // Version, Length, Ts, SeqNo, Source @@ -113,7 +119,7 @@ func TestOptionTemplates(t *testing.T) { }) t.Run("Multiple options template", func(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) raw := test.MakePacket([]uint16{ // Header // Version, Count, Ts, SeqNo, Source @@ -151,7 +157,7 @@ func TestOptionTemplates(t *testing.T) { }) t.Run("records discarded", func(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) raw := test.MakePacket([]uint16{ // Header // Version, Count, Ts, SeqNo, Source @@ -193,7 +199,7 @@ func TestOptionTemplates(t *testing.T) { func TestCustomFields(t *testing.T) { addr := test.MakeAddress(t, "127.0.0.1:12345") - conf := config.Defaults() + conf := config.Defaults(logp.L()) conf.WithCustomFields(fields.FieldDict{ fields.Key{EnterpriseID: 0x12345678, FieldID: 33}: &fields.Field{Name: "customField", Decoder: fields.String}, }) diff --git a/x-pack/filebeat/input/netflow/decoder/protocol/registry_test.go b/x-pack/filebeat/input/netflow/decoder/protocol/registry_test.go index b78fe875486f..8b2eb75f63c6 100644 --- a/x-pack/filebeat/input/netflow/decoder/protocol/registry_test.go +++ b/x-pack/filebeat/input/netflow/decoder/protocol/registry_test.go @@ -11,10 +11,16 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" ) +func init() { + logp.TestingSetup() +} + type testProto int func (testProto) Version() uint16 { @@ -61,7 +67,7 @@ func TestRegistry_Get(t *testing.T) { assert.NoError(t, err) gen, err := registry.Get("my_proto") assert.NoError(t, err) - assert.Equal(t, testProto(0), gen(config.Defaults())) + assert.Equal(t, testProto(0), gen(config.Defaults(logp.L()))) }) t.Run("two protocols", func(t *testing.T) { registry := ProtocolRegistry{} @@ -71,10 +77,10 @@ func TestRegistry_Get(t *testing.T) { assert.NoError(t, err) gen, err := registry.Get("my_proto") assert.NoError(t, err) - assert.Equal(t, testProto(1), gen(config.Defaults())) + assert.Equal(t, testProto(1), gen(config.Defaults(logp.L()))) gen, err = registry.Get("other_proto") assert.NoError(t, err) - assert.Equal(t, testProto(2), gen(config.Defaults())) + assert.Equal(t, testProto(2), gen(config.Defaults(logp.L()))) }) t.Run("not registered", func(t *testing.T) { registry := ProtocolRegistry{} diff --git a/x-pack/filebeat/input/netflow/decoder/test/helper.go b/x-pack/filebeat/input/netflow/decoder/test/helper.go index f62d03fa87ad..6041d2e6c80a 100644 --- a/x-pack/filebeat/input/netflow/decoder/test/helper.go +++ b/x-pack/filebeat/input/netflow/decoder/test/helper.go @@ -20,11 +20,6 @@ type TestLogWriter struct { testing.TB } -func (t TestLogWriter) Write(buf []byte) (int, error) { - t.Log(string(buf)) - return len(buf), nil -} - func MakeAddress(t testing.TB, ipPortPair string) net.Addr { ip, portS, err := net.SplitHostPort(ipPortPair) if err != nil { diff --git a/x-pack/filebeat/input/netflow/decoder/v1/v1.go b/x-pack/filebeat/input/netflow/decoder/v1/v1.go index e023341c4ad4..e7ccc6659ba3 100644 --- a/x-pack/filebeat/input/netflow/decoder/v1/v1.go +++ b/x-pack/filebeat/input/netflow/decoder/v1/v1.go @@ -9,7 +9,6 @@ import ( "encoding/binary" "fmt" "io" - "log" "net" "time" @@ -18,6 +17,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" + "github.com/elastic/elastic-agent-libs/logp" ) const ( @@ -52,21 +52,23 @@ var templateV1 = template.Template{ type ReadHeaderFn func(*bytes.Buffer, net.Addr) (int, time.Time, record.Map, error) type NetflowProtocol struct { - logger *log.Logger + logger *logp.Logger flowTemplate *template.Template version uint16 readHeader ReadHeaderFn } func init() { - protocol.Registry.Register(ProtocolName, New) + if err := protocol.Registry.Register(ProtocolName, New); err != nil { + panic(err) + } } func New(config config.Config) protocol.Protocol { - return NewProtocol(ProtocolID, &templateV1, readV1Header, log.New(config.LogOutput(), LogPrefix, 0)) + return NewProtocol(ProtocolID, &templateV1, readV1Header, config.LogOutput().Named(LogPrefix)) } -func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *log.Logger) protocol.Protocol { +func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *logp.Logger) protocol.Protocol { return &NetflowProtocol{ logger: logger, flowTemplate: template, @@ -90,7 +92,7 @@ func (NetflowProtocol) Stop() error { func (p *NetflowProtocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) { numFlows, timestamp, metadata, err := p.readHeader(buf, source) if err != nil { - p.logger.Printf("Failed parsing packet: %v", err) + p.logger.Debugf("Failed parsing packet: %v", err) return nil, fmt.Errorf("error reading netflow header: %w", err) } flows, err = p.flowTemplate.Apply(buf, numFlows) diff --git a/x-pack/filebeat/input/netflow/decoder/v1/v1_test.go b/x-pack/filebeat/input/netflow/decoder/v1/v1_test.go index 8887298c06d0..e88a168fbf56 100644 --- a/x-pack/filebeat/input/netflow/decoder/v1/v1_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v1/v1_test.go @@ -17,10 +17,15 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" template2 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" + "github.com/elastic/elastic-agent-libs/logp" ) +func init() { + logp.TestingSetup() +} + func TestNetflowProtocol_New(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) assert.Nil(t, proto.Start()) assert.Equal(t, uint16(1), proto.Version()) @@ -28,7 +33,7 @@ func TestNetflowProtocol_New(t *testing.T) { } func TestNetflowProtocol_OnPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00010002000000015bf689f605946fb0" + "acd910e5c0a8017b00000000000000000000000e00002cfa" + @@ -105,7 +110,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) { } func TestNetflowProtocol_BadPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00010002000000015bf689f605" raw, err := hex.DecodeString(rawS) diff --git a/x-pack/filebeat/input/netflow/decoder/v5/v5.go b/x-pack/filebeat/input/netflow/decoder/v5/v5.go index 74d4adbb70e0..d96091c8ca97 100644 --- a/x-pack/filebeat/input/netflow/decoder/v5/v5.go +++ b/x-pack/filebeat/input/netflow/decoder/v5/v5.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/binary" "io" - "log" "net" "time" @@ -54,11 +53,13 @@ var templateV5 = template.Template{ } func init() { - protocol.Registry.Register(ProtocolName, New) + if err := protocol.Registry.Register(ProtocolName, New); err != nil { + panic(err) + } } func New(config config.Config) protocol.Protocol { - return v1.NewProtocol(ProtocolID, &templateV5, ReadV5Header, log.New(config.LogOutput(), LogPrefix, 0)) + return v1.NewProtocol(ProtocolID, &templateV5, ReadV5Header, config.LogOutput().Named(LogPrefix)) } type PacketHeader struct { diff --git a/x-pack/filebeat/input/netflow/decoder/v5/v5_test.go b/x-pack/filebeat/input/netflow/decoder/v5/v5_test.go index 9494d482f6d3..821e6248e953 100644 --- a/x-pack/filebeat/input/netflow/decoder/v5/v5_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v5/v5_test.go @@ -13,14 +13,20 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) +func init() { + logp.TestingSetup() +} + func TestNetflowProtocol_New(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) assert.Nil(t, proto.Start()) assert.Equal(t, uint16(5), proto.Version()) @@ -28,7 +34,7 @@ func TestNetflowProtocol_New(t *testing.T) { } func TestNetflowProtocol_OnPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00050002000000015bf68d8b35fcb9780000000000000000" + "acd910e5c0a8017b00000000000000000000000e00002cfa" + @@ -119,7 +125,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) { } func TestNetflowProtocol_BadPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00050002000000015bf689f605" raw, err := hex.DecodeString(rawS) diff --git a/x-pack/filebeat/input/netflow/decoder/v6/v6.go b/x-pack/filebeat/input/netflow/decoder/v6/v6.go index a5d1bc339e97..5949af7960be 100644 --- a/x-pack/filebeat/input/netflow/decoder/v6/v6.go +++ b/x-pack/filebeat/input/netflow/decoder/v6/v6.go @@ -5,8 +5,6 @@ package v6 import ( - "log" - "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" @@ -49,9 +47,11 @@ var templateV6 = template.Template{ } func init() { - protocol.Registry.Register(ProtocolName, New) + if err := protocol.Registry.Register(ProtocolName, New); err != nil { + panic(err) + } } func New(config config.Config) protocol.Protocol { - return v1.NewProtocol(ProtocolID, &templateV6, v5.ReadV5Header, log.New(config.LogOutput(), LogPrefix, 0)) + return v1.NewProtocol(ProtocolID, &templateV6, v5.ReadV5Header, config.LogOutput().Named(LogPrefix)) } diff --git a/x-pack/filebeat/input/netflow/decoder/v6/v6_test.go b/x-pack/filebeat/input/netflow/decoder/v6/v6_test.go index af46896289e2..5703fe523434 100644 --- a/x-pack/filebeat/input/netflow/decoder/v6/v6_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v6/v6_test.go @@ -13,14 +13,20 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) +func init() { + logp.TestingSetup() +} + func TestNetflowProtocol_New(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) assert.Nil(t, proto.Start()) assert.Equal(t, uint16(6), proto.Version()) @@ -28,7 +34,7 @@ func TestNetflowProtocol_New(t *testing.T) { } func TestNetflowProtocol_OnPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00060002000000015bf68d8b35fcb9780000000000000000" + "acd910e5c0a8017b00000000000000000000000e00002cfa" + @@ -121,7 +127,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) { } func TestNetflowProtocol_BadPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00060002000000015bf689f605" raw, err := hex.DecodeString(rawS) diff --git a/x-pack/filebeat/input/netflow/decoder/v7/v7.go b/x-pack/filebeat/input/netflow/decoder/v7/v7.go index 62cbdc56a065..d851d9420784 100644 --- a/x-pack/filebeat/input/netflow/decoder/v7/v7.go +++ b/x-pack/filebeat/input/netflow/decoder/v7/v7.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/binary" "io" - "log" "net" "time" @@ -55,11 +54,13 @@ var v7template = template.Template{ } func init() { - protocol.Registry.Register(ProtocolName, New) + if err := protocol.Registry.Register(ProtocolName, New); err != nil { + panic(err) + } } func New(config config.Config) protocol.Protocol { - return v1.NewProtocol(ProtocolID, &v7template, ReadV7Header, log.New(config.LogOutput(), LogPrefix, 0)) + return v1.NewProtocol(ProtocolID, &v7template, ReadV7Header, config.LogOutput().Named(LogPrefix)) } type PacketHeader struct { diff --git a/x-pack/filebeat/input/netflow/decoder/v7/v7_test.go b/x-pack/filebeat/input/netflow/decoder/v7/v7_test.go index cafdbc36b563..66481394dbc5 100644 --- a/x-pack/filebeat/input/netflow/decoder/v7/v7_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v7/v7_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" @@ -20,7 +22,7 @@ import ( ) func TestNetflowProtocol_New(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) assert.Nil(t, proto.Start()) assert.Equal(t, uint16(7), proto.Version()) @@ -28,7 +30,7 @@ func TestNetflowProtocol_New(t *testing.T) { } func TestNetflowProtocol_OnPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00070002000000015bf68d8b35fcb9780000000000000000" + "acd910e5c0a8017b00000000000000000000000e00002cfa" + @@ -119,7 +121,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) { } func TestNetflowProtocol_BadPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00060002000000015bf689f605" raw, err := hex.DecodeString(rawS) diff --git a/x-pack/filebeat/input/netflow/decoder/v8/v8.go b/x-pack/filebeat/input/netflow/decoder/v8/v8.go index 9fa88ea1c686..06b3ac6c292c 100644 --- a/x-pack/filebeat/input/netflow/decoder/v8/v8.go +++ b/x-pack/filebeat/input/netflow/decoder/v8/v8.go @@ -9,7 +9,6 @@ import ( "encoding/binary" "fmt" "io" - "log" "net" "time" @@ -18,6 +17,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" + "github.com/elastic/elastic-agent-libs/logp" ) const ( @@ -300,16 +300,18 @@ var templates = map[AggType]*template.Template{ } type NetflowV8Protocol struct { - logger *log.Logger + logger *logp.Logger } func init() { - protocol.Registry.Register(ProtocolName, New) + if err := protocol.Registry.Register(ProtocolName, New); err != nil { + panic(err) + } } func New(config config.Config) protocol.Protocol { return &NetflowV8Protocol{ - logger: log.New(config.LogOutput(), LogPrefix, 0), + logger: config.LogOutput().Named(LogPrefix), } } @@ -320,12 +322,12 @@ func (NetflowV8Protocol) Version() uint16 { func (p *NetflowV8Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) { header, err := ReadPacketHeader(buf) if err != nil { - p.logger.Printf("Failed parsing packet: %v", err) + p.logger.Debugf("Failed parsing packet: %v", err) return nil, fmt.Errorf("error reading V8 header: %w", err) } template, found := templates[header.Aggregation] if !found { - p.logger.Printf("Packet from %s uses an unknown V8 aggregation: %d", source, header.Aggregation) + p.logger.Debugf("Packet from %s uses an unknown V8 aggregation: %d", source, header.Aggregation) return nil, fmt.Errorf("unsupported V8 aggregation: %d", header.Aggregation) } metadata := header.GetMetadata(source) diff --git a/x-pack/filebeat/input/netflow/decoder/v8/v8_test.go b/x-pack/filebeat/input/netflow/decoder/v8/v8_test.go index 81ad2437cae9..0624b76db47d 100644 --- a/x-pack/filebeat/input/netflow/decoder/v8/v8_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v8/v8_test.go @@ -14,12 +14,18 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" template2 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) +func init() { + logp.TestingSetup() +} + func TestTemplates(t *testing.T) { for code, template := range templates { if !template2.ValidateTemplate(t, template) { @@ -29,7 +35,7 @@ func TestTemplates(t *testing.T) { } func TestNetflowProtocol_New(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) assert.Nil(t, proto.Start()) assert.Equal(t, uint16(8), proto.Version()) @@ -37,7 +43,7 @@ func TestNetflowProtocol_New(t *testing.T) { } func TestNetflowProtocol_BadPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) rawS := "00080002000000015bf689f605" raw, err := hex.DecodeString(rawS) @@ -50,7 +56,7 @@ func TestNetflowProtocol_BadPacket(t *testing.T) { } func TestNetflowV8Protocol_OnPacket(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) address := test.MakeAddress(t, "127.0.0.1:11111") captureTime, err := time.Parse(time.RFC3339Nano, "2018-11-22T20:53:03.987654321Z") if !assert.NoError(t, err) { diff --git a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go index bd34b424d2f3..d283bab506be 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go @@ -10,10 +10,11 @@ import ( "errors" "fmt" "io" - "log" "net" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" @@ -29,18 +30,18 @@ type Decoder interface { ReadSetHeader(*bytes.Buffer) (SetHeader, error) ReadTemplateSet(setID uint16, buf *bytes.Buffer) ([]*template.Template, error) ReadFieldDefinition(*bytes.Buffer) (field fields.Key, length uint16, err error) - GetLogger() *log.Logger + GetLogger() *logp.Logger GetFields() fields.FieldDict } type DecoderV9 struct { - Logger *log.Logger + Logger *logp.Logger Fields fields.FieldDict } var _ Decoder = (*DecoderV9)(nil) -func (d DecoderV9) GetLogger() *log.Logger { +func (d DecoderV9) GetLogger() *logp.Logger { return d.Logger } @@ -124,10 +125,10 @@ func ReadFields(d Decoder, buf *bytes.Buffer, count int) (record template.Templa if length == template.VariableLength || min <= field.Length && field.Length <= max { field.Info = fieldInfo } else if logger != nil { - logger.Printf("Size of field %s in template is out of bounds (size=%d, min=%d, max=%d)", fieldInfo.Name, field.Length, min, max) + logger.Debugf("Size of field %s in template is out of bounds (size=%d, min=%d, max=%d)", fieldInfo.Name, field.Length, min, max) } } else if logger != nil { - logger.Printf("Field %v in template not found", key) + logger.Debugf("Field %v in template not found", key) } record.Fields[i] = field } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session.go b/x-pack/filebeat/input/netflow/decoder/v9/session.go index e72fa1ab80a6..4a31b16937d8 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session.go @@ -5,12 +5,13 @@ package v9 import ( - "log" "net" "sync" "sync/atomic" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" ) @@ -45,12 +46,12 @@ type SessionState struct { mutex sync.RWMutex Templates map[TemplateKey]*TemplateWrapper lastSequence uint32 - logger *log.Logger + logger *logp.Logger Delete atomic.Bool } // NewSession creates a new session. -func NewSession(logger *log.Logger) *SessionState { +func NewSession(logger *logp.Logger) *SessionState { return &SessionState{ logger: logger, Templates: make(map[TemplateKey]*TemplateWrapper), @@ -59,7 +60,7 @@ func NewSession(logger *log.Logger) *SessionState { // AddTemplate adds the passed template. func (s *SessionState) AddTemplate(t *template.Template) { - s.logger.Printf("state %p addTemplate %d %p", s, t.ID, t) + s.logger.Debugf("state %p addTemplate %d %p", s, t.ID, t) s.mutex.Lock() defer s.mutex.Unlock() s.Templates[TemplateKey(t.ID)] = &TemplateWrapper{Template: t} @@ -94,7 +95,7 @@ func (s *SessionState) ExpireTemplates() (alive int, removed int) { total = len(s.Templates) for _, id := range toDelete { if template, found := s.Templates[id]; found && template.Delete.Load() { - s.logger.Printf("expired template %v", id) + s.logger.Debugf("expired template %v", id) delete(s.Templates, id) removed++ } @@ -125,12 +126,12 @@ func isValidSequence(current, next uint32) bool { type SessionMap struct { mutex sync.RWMutex Sessions map[SessionKey]*SessionState - logger *log.Logger + logger *logp.Logger metric config.ActiveSessionsMetric } // NewSessionMap returns a new SessionMap. -func NewSessionMap(logger *log.Logger, metric config.ActiveSessionsMetric) SessionMap { +func NewSessionMap(logger *logp.Logger, metric config.ActiveSessionsMetric) SessionMap { return SessionMap{ logger: logger, Sessions: make(map[SessionKey]*SessionState), @@ -216,7 +217,7 @@ func (m *SessionMap) CleanupLoop(interval time.Duration, done <-chan struct{}) { case <-t.C: aliveS, removedS, aliveT, removedT := m.cleanup() if removedS > 0 || removedT > 0 { - m.logger.Printf("Expired %d sessions (%d remain) / %d templates (%d remain)", removedS, aliveS, removedT, aliveT) + m.logger.Debugf("Expired %d sessions (%d remain) / %d templates (%d remain)", removedS, aliveS, removedT, aliveT) } } } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go index 8c10b2b98e94..88d38284be8d 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go @@ -5,26 +5,29 @@ package v9 import ( - "io" - "log" "math" "sync" "testing" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) -var logger = log.New(io.Discard, "", 0) +func init() { + logp.TestingSetup() +} func makeSessionKey(t testing.TB, ipPortPair string, domain uint32) SessionKey { return MakeSessionKey(test.MakeAddress(t, ipPortPair), domain, false) } func TestSessionMap_GetOrCreate(t *testing.T) { + var logger = logp.NewLogger("session_map") t.Run("consistent behavior", func(t *testing.T) { sm := NewSessionMap(logger, nil) @@ -101,7 +104,7 @@ func testTemplate(id uint16) *template.Template { } func TestSessionState(t *testing.T) { - logger := log.New(io.Discard, "", 0) + var logger = logp.NewLogger("session_state") t.Run("create and get", func(t *testing.T) { s := NewSession(logger) t1 := testTemplate(1) @@ -133,7 +136,7 @@ func TestSessionState(t *testing.T) { } func TestSessionMap_Cleanup(t *testing.T) { - sm := NewSessionMap(logger, nil) + sm := NewSessionMap(logp.L(), nil) // Session is created k1 := makeSessionKey(t, "127.0.0.1:1234", 1) @@ -180,7 +183,7 @@ func TestSessionMap_Cleanup(t *testing.T) { func TestSessionMap_CleanupLoop(t *testing.T) { timeout := time.Millisecond * 100 - sm := NewSessionMap(log.New(io.Discard, "", 0), nil) + sm := NewSessionMap(logp.NewLogger(""), nil) key := makeSessionKey(t, "127.0.0.1:1", 42) s := sm.GetOrCreate(key) @@ -201,7 +204,7 @@ func TestSessionMap_CleanupLoop(t *testing.T) { } func TestTemplateExpiration(t *testing.T) { - s := NewSession(logger) + s := NewSession(logp.L()) assert.Nil(t, s.GetTemplate(256)) assert.Nil(t, s.GetTemplate(257)) s.AddTemplate(testTemplate(256)) @@ -263,7 +266,7 @@ func TestSessionCheckReset(t *testing.T) { }, } { t.Run(testCase.title, func(t *testing.T) { - s := NewSession(logger) + s := NewSession(logp.L()) s.lastSequence = testCase.current prev, isReset := s.CheckReset(testCase.next) assert.Equal(t, prev, testCase.current) diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9.go b/x-pack/filebeat/input/netflow/decoder/v9/v9.go index 4e67dde701f3..611a9dcba256 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9.go @@ -8,10 +8,11 @@ import ( "bytes" "context" "fmt" - "log" "net" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" @@ -30,7 +31,7 @@ type NetflowV9Protocol struct { ctx context.Context cancel context.CancelFunc decoder Decoder - logger *log.Logger + logger *logp.Logger Session SessionMap timeout time.Duration cache *pendingTemplatesCache @@ -39,15 +40,17 @@ type NetflowV9Protocol struct { } func init() { - _ = protocol.Registry.Register(ProtocolName, New) + if err := protocol.Registry.Register(ProtocolName, New); err != nil { + panic(err) + } } func New(config config.Config) protocol.Protocol { - logger := log.New(config.LogOutput(), LogPrefix, 0) + logger := config.LogOutput().Named(LogPrefix) return NewProtocolWithDecoder(DecoderV9{Logger: logger, Fields: config.Fields()}, config, logger) } -func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol { +func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *logp.Logger) *NetflowV9Protocol { ctx, cancel := context.WithCancel(context.Background()) pd := &NetflowV9Protocol{ ctx: ctx, @@ -94,7 +97,7 @@ func (p *NetflowV9Protocol) Stop() error { func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) { header, payload, numFlowSets, err := p.decoder.ReadPacketHeader(buf) if err != nil { - p.logger.Printf("Unable to read V9 header: %v", err) + p.logger.Debugf("Unable to read V9 header: %v", err) return nil, fmt.Errorf("error reading header: %w", err) } buf = payload @@ -104,10 +107,10 @@ func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows session := p.Session.GetOrCreate(sessionKey) remote := source.String() - p.logger.Printf("Packet from:%s src:%d seq:%d", remote, header.SourceID, header.SequenceNo) + p.logger.Debugf("Packet from:%s src:%d seq:%d", remote, header.SourceID, header.SequenceNo) if p.detectReset { if prev, reset := session.CheckReset(header.SequenceNo); reset { - p.logger.Printf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, prev) + p.logger.Debugf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, prev) } } @@ -117,15 +120,15 @@ func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows break } if buf.Len() < set.BodyLength() { - p.logger.Printf("FlowSet ID %+v overflows packet from %s", set, source) + p.logger.Debugf("FlowSet ID %+v overflows packet from %s", set, source) break } body := bytes.NewBuffer(buf.Next(set.BodyLength())) - p.logger.Printf("FlowSet ID %d length %d", set.SetID, set.BodyLength()) + p.logger.Debugf("FlowSet ID %d length %d", set.SetID, set.BodyLength()) f, err := p.parseSet(set.SetID, sessionKey, session, body) if err != nil { - p.logger.Printf("Error parsing set %d: %v", set.SetID, err) + p.logger.Debugf("Error parsing set %d: %v", set.SetID, err) return nil, fmt.Errorf("error parsing set: %w", err) } flows = append(flows, f...) @@ -152,7 +155,7 @@ func (p *NetflowV9Protocol) parseSet( if p.cache != nil { p.cache.Add(key, buf) } else { - p.logger.Printf("No template for ID %d", setID) + p.logger.Debugf("No template for ID %d", setID) } return nil, nil } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go b/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go index 67212c1e4084..c2fc1c2fa14e 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go @@ -10,17 +10,23 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) +func init() { + logp.TestingSetup() +} + func TestNetflowV9Protocol_ID(t *testing.T) { - assert.Equal(t, ProtocolID, New(config.Defaults()).Version()) + assert.Equal(t, ProtocolID, New(config.Defaults(logp.L())).Version()) } func TestNetflowProtocol_New(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) assert.Nil(t, proto.Start()) assert.Equal(t, uint16(9), proto.Version()) @@ -33,7 +39,7 @@ func TestOptionTemplates(t *testing.T) { key := MakeSessionKey(addr, sourceID, false) t.Run("Single options template", func(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) flows, err := proto.OnPacket(test.MakePacket([]uint16{ // Header // Version, Count, Uptime, Ts, SeqNo, Source @@ -62,7 +68,7 @@ func TestOptionTemplates(t *testing.T) { }) t.Run("Multiple options template", func(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) raw := test.MakePacket([]uint16{ // Header // Version, Count, Uptime, Ts, SeqNo, Source @@ -99,7 +105,7 @@ func TestOptionTemplates(t *testing.T) { }) t.Run("records discarded", func(t *testing.T) { - proto := New(config.Defaults()) + proto := New(config.Defaults(logp.L())) raw := test.MakePacket([]uint16{ // Header // Version, Count, Uptime, Ts, SeqNo, Source @@ -161,8 +167,8 @@ func TestSessionReset(t *testing.T) { 3, 3, } t.Run("Reset disabled", func(t *testing.T) { - cfg := config.Defaults() - cfg.WithSequenceResetEnabled(false).WithLogOutput(test.TestLogWriter{TB: t}) + cfg := config.Defaults(logp.NewLogger("v9_test")) + cfg.WithSequenceResetEnabled(false) proto := New(cfg) flows, err := proto.OnPacket(test.MakePacket(templatePacket), addr) assert.NoError(t, err) @@ -172,8 +178,8 @@ func TestSessionReset(t *testing.T) { assert.Len(t, flows, 1) }) t.Run("Reset enabled", func(t *testing.T) { - cfg := config.Defaults() - cfg.WithSequenceResetEnabled(true).WithLogOutput(test.TestLogWriter{TB: t}) + cfg := config.Defaults(logp.NewLogger("v9_test")) + cfg.WithSequenceResetEnabled(true) proto := New(cfg) flows, err := proto.OnPacket(test.MakePacket(templatePacket), addr) assert.NoError(t, err) @@ -192,8 +198,8 @@ func TestSessionReset(t *testing.T) { tmp[9] = uint16(sourceID & 0xffff) return test.MakePacket(tmp) } - cfg := config.Defaults() - cfg.WithSequenceResetEnabled(true).WithLogOutput(test.TestLogWriter{TB: t}) + cfg := config.Defaults(logp.NewLogger("v9_test")) + cfg.WithSequenceResetEnabled(true) proto := New(cfg) flows, err := proto.OnPacket(mkPack(templatePacket, 1, 1000), addr) assert.NoError(t, err) @@ -213,7 +219,7 @@ func TestSessionReset(t *testing.T) { func TestCustomFields(t *testing.T) { addr := test.MakeAddress(t, "127.0.0.1:12345") - conf := config.Defaults() + conf := config.Defaults(logp.L()) conf.WithCustomFields(fields.FieldDict{ fields.Key{FieldID: 33333}: &fields.Field{Name: "customField", Decoder: fields.String}, }) @@ -276,7 +282,7 @@ func TestSharedTemplates(t *testing.T) { } t.Run("Template sharing enabled", func(t *testing.T) { - cfg := config.Defaults() + cfg := config.Defaults(logp.L()) cfg.WithSharedTemplates(true) proto := New(cfg) flows, err := proto.OnPacket(test.MakePacket(templatePacket), templateAddr) @@ -288,7 +294,7 @@ func TestSharedTemplates(t *testing.T) { }) t.Run("Template sharing disabled", func(t *testing.T) { - cfg := config.Defaults() + cfg := config.Defaults(logp.L()) cfg.WithSharedTemplates(false) proto := New(cfg) flows, err := proto.OnPacket(test.MakePacket(templatePacket), templateAddr) diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index bb4046b74a91..a415f994a9b5 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -129,10 +129,9 @@ func (n *netflowInput) Run(env v2.Context, connector beat.PipelineConnector) err n.metrics = newInputMetrics(n.udpMetrics.Registry()) var err error - n.decoder, err = decoder.NewDecoder(decoder.NewConfig(). + n.decoder, err = decoder.NewDecoder(decoder.NewConfig(n.logger). WithProtocols(n.cfg.Protocols...). WithExpiration(n.cfg.ExpirationTimeout). - WithLogOutput(&logDebugWrapper{Logger: n.logger}). WithCustomFields(n.customFields...). WithSequenceResetEnabled(n.cfg.DetectSequenceReset). WithSharedTemplates(n.cfg.ShareTemplates). @@ -236,26 +235,6 @@ func (n *netflowInput) Run(env v2.Context, connector beat.PipelineConnector) err return nil } -// An adapter so that logp.Logger can be used as a log.Logger. -type logDebugWrapper struct { - sync.Mutex - Logger *logp.Logger - buf []byte -} - -// Write writes messages to the log. -func (w *logDebugWrapper) Write(p []byte) (n int, err error) { - w.Lock() - defer w.Unlock() - n = len(p) - w.buf = append(w.buf, p...) - for endl := bytes.IndexByte(w.buf, '\n'); endl != -1; endl = bytes.IndexByte(w.buf, '\n') { - w.Logger.Debug(string(w.buf[:endl])) - w.buf = w.buf[endl+1:] - } - return n, nil -} - // stop stops the netflow input func (n *netflowInput) stop() { n.mtx.Lock() diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index 65383df4a98b..5bc4763ea7e6 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -49,6 +49,10 @@ const ( datSourceIP = "192.0.2.1" ) +func init() { + logp.TestingSetup() +} + // DatTests specifies the .dat files associated with test cases. type DatTests struct { Tests map[string]TestCase `yaml:"tests"` @@ -289,11 +293,10 @@ func readDatTests(t testing.TB) *DatTests { func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult { t.Helper() - config := decoder.NewConfig(). + config := decoder.NewConfig(logp.NewLogger("netflow_test")). WithProtocols(protocol.Registry.All()...). WithSequenceResetEnabled(false). - WithExpiration(0). - WithLogOutput(test.TestLogWriter{TB: t}) + WithExpiration(0) for _, fieldFile := range testCase.Fields { fields, err := LoadFieldDefinitionsFromFile(filepath.Join(fieldsDir, fieldFile)) @@ -351,12 +354,11 @@ func getFlowsFromPCAP(t testing.TB, name, pcapFile string) TestResult { r, err := pcapgo.NewReader(f) require.NoError(t, err) - config := decoder.NewConfig(). + config := decoder.NewConfig(logp.NewLogger("netflow_test")). WithProtocols(protocol.Registry.All()...). WithSequenceResetEnabled(false). WithExpiration(0). - WithCache(strings.HasSuffix(pcapFile, ".reversed.pcap")). - WithLogOutput(test.TestLogWriter{TB: t}) + WithCache(strings.HasSuffix(pcapFile, ".reversed.pcap")) decoder, err := decoder.NewDecoder(config) if !assert.NoError(t, err) {