Skip to content

Commit

Permalink
feat: return partial reports without failure (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek authored Aug 11, 2022
1 parent 7c03ed4 commit 20f386a
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 48 deletions.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ require (
sigs.k8s.io/controller-runtime v0.12.3
)

require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
Expand All @@ -27,6 +32,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
25 changes: 25 additions & 0 deletions pkg/forwarders/rawchannelforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package forwarders

import (
"github.com/kong/kubernetes-telemetry/pkg/types"
)

type rawChannelForwarder struct {
ch chan types.Report
}

// NewRawChannelForwarder creates new rawChannelForwarder.
func NewRawChannelForwarder(ch chan types.Report) *rawChannelForwarder {
return &rawChannelForwarder{
ch: ch,
}
}

func (f *rawChannelForwarder) Name() string {
return "LogForwarder"
}

func (f *rawChannelForwarder) Forward(r types.Report) error {
f.ch <- r
return nil
}
33 changes: 8 additions & 25 deletions pkg/forwarders/tlsforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"

"github.com/kong/kubernetes-telemetry/pkg/log"
)

const (
Expand All @@ -20,32 +21,14 @@ var tlsConf = tls.Config{
MaxVersion: tls.VersionTLS13,
}

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)

type tlsForwarder struct {
log logr.Logger
conn *tls.Conn
logger logr.Logger
conn *tls.Conn
}

// NewTLSForwarder creates a TLS forwarder which forwards received serialized reports
// to a TLS endpoint specified by the provided address.
func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder {
func NewTLSForwarder(address string, logger logr.Logger) *tlsForwarder {
conn, err := tls.DialWithDialer(
&net.Dialer{
Timeout: defaultTimeout,
Expand All @@ -55,13 +38,13 @@ func NewTLSForwarder(address string, log logr.Logger) *tlsForwarder {
&tlsConf,
)
if err != nil {
log.V(DebugLevel).Info("failed to connect to reporting server", "error", err)
logger.V(log.DebugLevel).Info("failed to connect to reporting server", "error", err)
return nil
}

return &tlsForwarder{
log: log,
conn: conn,
logger: logger,
conn: conn,
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/log/logp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package log

import "github.com/sirupsen/logrus"

// TODO: Address logging levels and library to be used.
// See: https://github.com/Kong/kubernetes-ingress-controller/issues/1893
const (
logrusrDiff = 4

// InfoLevel is the converted logging level from logrus to go-logr for
// information level logging. Note that the logrusr middleware technically
// flattens all levels prior to this level into this level as well.
InfoLevel = int(logrus.InfoLevel) - logrusrDiff

// DebugLevel is the converted logging level from logrus to go-logr for
// debug level logging.
DebugLevel = int(logrus.DebugLevel) - logrusrDiff

// WarnLevel is the converted logrus level to go-logr for warnings.
WarnLevel = int(logrus.WarnLevel) - logrusrDiff
)
65 changes: 62 additions & 3 deletions pkg/telemetry/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type Forwarder interface {
// serialize the data and then forward it using the provided forwarder.
func NewConsumer(s Serializer, f Forwarder) *consumer {
var (
ch = make(chan types.Report)
done = make(chan struct{})
logger = defaultLogger() // TODO: allow configuration
ch = make(chan types.Report)
done = make(chan struct{})
// TODO: allow configuration: https://github.com/Kong/kubernetes-telemetry/issues/46
logger = defaultLogger()
)

go func() {
Expand Down Expand Up @@ -56,12 +57,70 @@ func NewConsumer(s Serializer, f Forwarder) *consumer {
}
}

// Intake returns a channel on which this consumer will wait for data to consume it.
func (c *consumer) Intake() chan<- types.Report {
return c.ch
}

// Close closes the consumer.
func (c *consumer) Close() {
c.once.Do(func() {
close(c.done)
})
}

type rawConsumer struct {
logger logr.Logger
once sync.Once
ch chan types.Report
done chan struct{}
}

// RawForwarder is used to forward raw, unserialized telemetry reports to configured
// destination(s).
type RawForwarder interface {
Name() string
Forward(types.Report) error
}

// NewRawConsumer creates a new rawconsumer that will use the provided raw forwarder
// to forward received reports.
func NewRawConsumer(f RawForwarder) *rawConsumer {
var (
ch = make(chan types.Report)
done = make(chan struct{})
// TODO: allow configuration: https://github.com/Kong/kubernetes-telemetry/issues/46
logger = defaultLogger()
)

go func() {
for {
select {
case <-done:
return
case r := <-ch:
if err := f.Forward(r); err != nil {
logger.Error(err, "failed to forward report using raw forwarder: %s", f.Name())
}
}
}
}()

return &rawConsumer{
logger: logger,
ch: ch,
done: done,
}
}

// Intake returns a channel on which this consumer will wait for data to consume it.
func (c *rawConsumer) Intake() chan<- types.Report {
return c.ch
}

// Close closes rawconsumer.
func (c *rawConsumer) Close() {
c.once.Do(func() {
close(c.done)
})
}
34 changes: 20 additions & 14 deletions pkg/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/hashicorp/go-multierror"
"github.com/puzpuzpuz/xsync"

"github.com/kong/kubernetes-telemetry/pkg/provider"
"github.com/kong/kubernetes-telemetry/pkg/log"
"github.com/kong/kubernetes-telemetry/pkg/types"
)

Expand Down Expand Up @@ -165,7 +165,14 @@ func (m *manager) workflowsLoop() {
case <-ticker.C:
report, err := m.Execute(context.Background())
if err != nil {
m.logger.Error(err, "error executing workflows")
m.logger.V(log.DebugLevel).
WithValues("error", err.Error()).
Info("error executing workflows")
}

// Continue the execution even if we get an error but account for possibility
// of getting nil reports, in which case move on to the next iteration (tick).
if report == nil {
continue
}

Expand All @@ -179,28 +186,27 @@ func (m *manager) workflowsLoop() {
}

// Execute executes all configures workflows and returns an aggregated report
// from all the underying providers.
// from all the underlying providers.
func (m *manager) Execute(ctx context.Context) (types.Report, error) {
var (
err error
mErr error
report = types.Report{}
)

m.workflows.Range(func(name string, v Workflow) bool {
var r provider.Report
r, err = v.Execute(ctx)
r, err := v.Execute(ctx)
if err != nil {
err = errors.Wrapf(err, "error executing workflow %s", name)
// TODO: return true and don't abort when encountering an error.
// Better to report partial report than nothing. In order to do so
// use an error agreggator like https://github.com/hashicorp/go-multierror.
return false
mErr = multierror.Append(mErr, err)
}

// Add the report regardless if it's partial only omitting empty (nil) reports.
if r != nil {
report[v.Name()] = r
}

report[v.Name()] = r
return true
})
return report, err
return report, mErr
}

// consumerLoop loops over all configured consumers and sends the gathered telemetry
Expand Down
63 changes: 63 additions & 0 deletions pkg/telemetry/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package telemetry

import (
"errors"
"fmt"
"runtime"
"testing"
"time"

"github.com/bombsimon/logrusr/v3"
"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -238,3 +241,63 @@ func TestManagerWithCatalogWorkflows(t *testing.T) {
}, report)
})
}

func TestManagerWithMultilpleWorkflowsOneReturningError(t *testing.T) {
logrusLog := logrus.New()
logrusLog.Level = logrus.DebugLevel
log := logrusr.New(logrusLog)
m, err := NewManager(
OptManagerLogger(log),
OptManagerPeriod(time.Millisecond),
)
require.NoError(t, err)

{
w := NewWorkflow("basic")
{
p, err := provider.NewFixedValueProvider("constant1", provider.Report{
"constant1": "value1",
})
require.NoError(t, err)
w.AddProvider(p)
}

m.AddWorkflow(w)
}
{
w := NewWorkflow("basic_with_error")
{
p, err := provider.NewFunctorProvider("error_provider", func() (provider.Report, error) {
return nil, errors.New("I am an error")
})
require.NoError(t, err)
w.AddProvider(p)
}
{

p, err := provider.NewFixedValueProvider("constant2", provider.Report{
"constant2": "value2",
})
require.NoError(t, err)
w.AddProvider(p)
}

m.AddWorkflow(w)
}

ch := make(chan types.Report)
consumer := NewRawConsumer(forwarders.NewRawChannelForwarder(ch))
require.NoError(t, m.AddConsumer(consumer))
require.NoError(t, m.Start())

report := <-ch
m.Stop()
require.EqualValues(t, types.Report{
"basic": provider.Report{
"constant1": "value1",
},
"basic_with_error": provider.Report{
"constant2": "value2",
},
}, report)
}
Loading

0 comments on commit 20f386a

Please sign in to comment.