diff --git a/kafka/logger.go b/kafka/logger.go index d233cc1a..65caefbc 100644 --- a/kafka/logger.go +++ b/kafka/logger.go @@ -18,6 +18,8 @@ package kafka import ( + "context" + "errors" "net" "time" @@ -35,12 +37,17 @@ type loggerHook struct { // OnBrokerConnect implements the kgo.HookBrokerConnect interface. func (l *loggerHook) OnBrokerConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) { if err != nil { - l.logger.Error("failed to connect to broker", + fields := []zap.Field{ zap.Error(err), - zap.String("duration", dialDur.String()), + zap.Duration("event.duration", dialDur), zap.String("host", meta.Host), zap.Int32("port", meta.Port), zap.Stack("stack"), - ) + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + l.logger.Warn("failed to connect to broker", fields...) + return + } + l.logger.Error("failed to connect to broker", fields...) } } diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 5061bc8a..89151a1e 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -22,38 +22,74 @@ import ( "errors" "net" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" ) func TestHookLogsFailedDial(t *testing.T) { - cluster, cfg := newFakeCluster(t) - t.Cleanup(cluster.Close) - - core, logs := observer.New(zap.ErrorLevel) - cfg.Logger = zap.New(core) - // Simulate returning an error when dialing the broker. - const errorMsg = "busted" - cfg.Dialer = func(context.Context, string, string) (net.Conn, error) { - return nil, errors.New(errorMsg) + assertLogs := func(t *testing.T, + logs *observer.ObservedLogs, + expectedLevel zapcore.Level, + expectedErr string, + ) { + observedLogs := logs.FilterMessage("failed to connect to broker").TakeAll() + // Franz-go will retry once to connect to the broker, so we might see either one or two log lines. + assert.GreaterOrEqual(t, len(observedLogs), 1, + "expected one or two log lines, got %#v", observedLogs, + ) + // The error message should contain the error message from the dialer. + assert.EqualValues(t, observedLogs[0].ContextMap()["error"], expectedErr) + assert.Contains(t, observedLogs[0].ContextMap(), "event.duration") + assert.Equal(t, observedLogs[0].Level, expectedLevel) } + t.Run("context.Canceled", func(t *testing.T) { + cluster, cfg := newFakeCluster(t) + t.Cleanup(cluster.Close) - // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster - // using the broken dialer. - c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) - require.NoError(t, err) - assert.Error(t, c.Ping(context.Background())) + core, logs := observer.New(zap.WarnLevel) + cfg.Logger = zap.New(core) + ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) + defer cancel() + cfg.Dialer = func(context.Context, string, string) (net.Conn, error) { + <-ctx.Done() + return nil, ctx.Err() + } + // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster + // using the broken dialer. + c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) + require.NoError(t, err) - observedLogs := logs.FilterMessage("failed to connect to broker").TakeAll() - // Franz-go will retry once to connect to the broker, so we might see either one or two log lines. - assert.True(t, len(observedLogs) == 1 || len(observedLogs) == 2, - "expected one or two log lines, got %#v", observedLogs) + <-time.After(time.Millisecond) - // The error message should contain the error message from the dialer. - assert.EqualValues(t, observedLogs[0].ContextMap()["error"], errorMsg) - assert.Contains(t, observedLogs[0].ContextMap(), "duration") + // The dialer will return context.Canceled, which should be logged as a warning. + assert.Error(t, c.Ping(ctx)) + + assertLogs(t, logs, zap.WarnLevel, context.DeadlineExceeded.Error()) + }) + t.Run("busted dialer", func(t *testing.T) { + cluster, cfg := newFakeCluster(t) + t.Cleanup(cluster.Close) + + core, logs := observer.New(zap.ErrorLevel) + cfg.Logger = zap.New(core) + // Simulate returning an error when dialing the broker. + const errorMsg = "busted" + cfg.Dialer = func(context.Context, string, string) (net.Conn, error) { + return nil, errors.New(errorMsg) + } + + // Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster + // using the broken dialer. + c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") }) + require.NoError(t, err) + assert.Error(t, c.Ping(context.Background())) + + assertLogs(t, logs, zap.ErrorLevel, errorMsg) + }) }