Skip to content

Commit

Permalink
feat: report logs back to the api (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
varnastadeus authored Nov 17, 2021
1 parent 0dc6200 commit 233bf0e
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 66 deletions.
4 changes: 2 additions & 2 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *service) doWork(ctx context.Context) error {
func (s *service) pollActions(ctx context.Context) ([]*castai.ClusterAction, error) {
ctx, cancel := context.WithTimeout(ctx, s.cfg.PollTimeout)
defer cancel()
actions, err := s.castaiClient.GetActions(ctx, s.cfg.ClusterID)
actions, err := s.castaiClient.GetActions(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *service) ackAction(ctx context.Context, action *castai.ClusterAction, h
return backoff.RetryNotify(func() error {
ctx, cancel := context.WithTimeout(ctx, s.cfg.AckTimeout)
defer cancel()
return s.castaiClient.AckAction(ctx, s.cfg.ClusterID, action.ID, &castai.AckClusterActionRequest{
return s.castaiClient.AckAction(ctx, action.ID, &castai.AckClusterActionRequest{
Error: getHandlerError(handleErr),
})
}, backoff.WithContext(
Expand Down
64 changes: 15 additions & 49 deletions actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/goleak"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/castai/mock"
)

func TestMain(m *testing.M) {
Expand All @@ -32,8 +33,8 @@ func TestActions(t *testing.T) {
ClusterID: uuid.New().String(),
}

newTestService := func(handler ActionHandler, telemetryClient castai.Client) Service {
svc := NewService(log, cfg, nil, telemetryClient)
newTestService := func(handler ActionHandler, client castai.Client) Service {
svc := NewService(log, cfg, nil, client)
handlers := svc.(*service).actionHandlers
// Patch handlers with a mock one.
for k := range handlers {
Expand Down Expand Up @@ -66,16 +67,16 @@ func TestActions(t *testing.T) {
},
},
}
telemetryClient := newMockTelemetryClient(apiActions)
client := mock.NewMockAPIClient(apiActions)
handler := &mockAgentActionHandler{}
svc := newTestService(handler, telemetryClient)
svc := newTestService(handler, client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer func() {
cancel()
r.Len(telemetryClient.acks, 3)
r.Equal("a1", telemetryClient.acks[0].actionID)
r.Equal("a2", telemetryClient.acks[1].actionID)
r.Equal("a3", telemetryClient.acks[2].actionID)
r.Len(client.Acks, 3)
r.Equal("a1", client.Acks[0].ActionID)
r.Equal("a2", client.Acks[1].ActionID)
r.Equal("a3", client.Acks[2].ActionID)
}()
svc.Run(ctx)
})
Expand All @@ -90,16 +91,16 @@ func TestActions(t *testing.T) {
},
},
}
telemetryClient := newMockTelemetryClient(apiActions)
client := mock.NewMockAPIClient(apiActions)
handler := &mockAgentActionHandler{err: errors.New("ups")}
svc := newTestService(handler, telemetryClient)
svc := newTestService(handler, client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer func() {
cancel()
r.Empty(telemetryClient.actions)
r.Len(telemetryClient.acks, 1)
r.Equal("a1", telemetryClient.acks[0].actionID)
r.Equal("ups", *telemetryClient.acks[0].err)
r.Empty(client.Actions)
r.Len(client.Acks, 1)
r.Equal("a1", client.Acks[0].ActionID)
r.Equal("ups", *client.Acks[0].Err)
}()
svc.Run(ctx)
})
Expand All @@ -112,38 +113,3 @@ type mockAgentActionHandler struct {
func (m *mockAgentActionHandler) Handle(ctx context.Context, data interface{}) error {
return m.err
}

func newMockTelemetryClient(actions []*castai.ClusterAction) *mockTelemetryClient {
return &mockTelemetryClient{actions: actions}
}

type mockAck struct {
actionID string
err *string
}

type mockTelemetryClient struct {
actions []*castai.ClusterAction
acks []*mockAck
}

func (m *mockTelemetryClient) GetActions(ctx context.Context, clusterID string) ([]*castai.ClusterAction, error) {
return m.actions, nil
}

func (m *mockTelemetryClient) AckAction(ctx context.Context, clusterID, actionID string, req *castai.AckClusterActionRequest) error {
m.removeAckedActions(actionID)

m.acks = append(m.acks, &mockAck{actionID: actionID, err: req.Error})
return nil
}

func (m *mockTelemetryClient) removeAckedActions(actionID string) {
var remaining []*castai.ClusterAction
for _, action := range m.actions {
if action.ID != actionID {
remaining = append(remaining, action)
}
}
m.actions = remaining
}
41 changes: 30 additions & 11 deletions castai/castai_client.go → castai/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ var (
)

type Client interface {
GetActions(ctx context.Context, clusterID string) ([]*ClusterAction, error)
AckAction(ctx context.Context, clusterID, actionID string, req *AckClusterActionRequest) error
GetActions(ctx context.Context) ([]*ClusterAction, error)
AckAction(ctx context.Context, actionID string, req *AckClusterActionRequest) error
SendLogs(ctx context.Context, req *LogEvent) error
}

func NewClient(log *logrus.Logger, rest *resty.Client) Client {
func NewClient(log *logrus.Logger, rest *resty.Client, clusterID string) Client {
return &client{
log: log,
rest: rest,
log: log,
rest: rest,
clusterID: clusterID,
}
}

Expand All @@ -42,16 +44,33 @@ func NewDefaultClient(url, key string, level logrus.Level) *resty.Client {
}

type client struct {
log *logrus.Logger
rest *resty.Client
log *logrus.Logger
rest *resty.Client
clusterID string
}

func (c *client) GetActions(ctx context.Context, clusterID string) ([]*ClusterAction, error) {
func (c *client) SendLogs(ctx context.Context, req *LogEvent) error {
resp, err := c.rest.R().
SetBody(req).
SetContext(ctx).
Post(fmt.Sprintf("/v1/kubernetes/clusters/%s/actions/logs", c.clusterID))

if err != nil {
return fmt.Errorf("sending logs: %w", err)
}
if resp.IsError() {
return fmt.Errorf("sending logs: request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
}

return nil
}

func (c *client) GetActions(ctx context.Context) ([]*ClusterAction, error) {
res := &GetClusterActionsResponse{}
resp, err := c.rest.R().
SetContext(ctx).
SetResult(res).
Get(fmt.Sprintf("/v1/kubernetes/clusters/%s/actions", clusterID))
Get(fmt.Sprintf("/v1/kubernetes/clusters/%s/actions", c.clusterID))
if err != nil {
return nil, fmt.Errorf("failed to request cluster-actions: %w", err)
}
Expand All @@ -61,11 +80,11 @@ func (c *client) GetActions(ctx context.Context, clusterID string) ([]*ClusterAc
return res.Items, nil
}

func (c *client) AckAction(ctx context.Context, clusterID, actionID string, req *AckClusterActionRequest) error {
func (c *client) AckAction(ctx context.Context, actionID string, req *AckClusterActionRequest) error {
resp, err := c.rest.R().
SetContext(ctx).
SetBody(req).
Post(fmt.Sprintf("/v1/kubernetes/clusters/%s/actions/%s/ack", clusterID, actionID))
Post(fmt.Sprintf("/v1/kubernetes/clusters/%s/actions/%s/ack", c.clusterID, actionID))
if err != nil {
return fmt.Errorf("failed to request cluster-actions ack: %v", err)
}
Expand Down
52 changes: 52 additions & 0 deletions castai/mock/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package mock

import (
"context"
"sync"

"github.com/castai/cluster-controller/castai"
)

func NewMockAPIClient(actions []*castai.ClusterAction) *mockClient {
return &mockClient{Actions: actions}
}

type mockAck struct {
ActionID string
Err *string
}

type mockClient struct {
Actions []*castai.ClusterAction
Logs []*castai.LogEvent
Acks []*mockAck
mu sync.Mutex
}

func (m *mockClient) GetActions(_ context.Context) ([]*castai.ClusterAction, error) {
return m.Actions, nil
}

func (m *mockClient) SendLogs(_ context.Context, req *castai.LogEvent) error {
m.mu.Lock()
m.Logs = append(m.Logs, req)
m.mu.Unlock()
return nil
}

func (m *mockClient) AckAction(_ context.Context, actionID string, req *castai.AckClusterActionRequest) error {
m.removeAckedActions(actionID)

m.Acks = append(m.Acks, &mockAck{ActionID: actionID, Err: req.Error})
return nil
}

func (m *mockClient) removeAckedActions(actionID string) {
var remaining []*castai.ClusterAction
for _, action := range m.Actions {
if action.ID != actionID {
remaining = append(remaining, action)
}
}
m.Actions = remaining
}
9 changes: 9 additions & 0 deletions castai/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package castai

import (
"time"

"github.com/sirupsen/logrus"
)

type GetClusterActionsResponse struct {
Expand Down Expand Up @@ -35,6 +37,13 @@ func (c *ClusterAction) Data() interface{} {
return nil
}

type LogEvent struct {
Level string `json:"level"`
Time time.Time `json:"time"`
Message string `json:"message"`
Fields logrus.Fields `json:"fields"`
}

type ActionDeleteNode struct {
NodeName string `json:"nodeName"`
}
Expand Down
81 changes: 81 additions & 0 deletions log/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package log

import (
"context"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"

"github.com/castai/cluster-controller/castai"
)

const (
sendTimeout = 15 * time.Second
)

type Exporter interface {
logrus.Hook
Wait()
}

func NewExporter(logger *logrus.Logger, client castai.Client) Exporter {
return &exporter{
logger: logger,
client: client,
wg: sync.WaitGroup{},
}
}

type exporter struct {
logger *logrus.Logger
client castai.Client
wg sync.WaitGroup
}

func (e *exporter) Levels() []logrus.Level {
return []logrus.Level{
logrus.ErrorLevel,
logrus.FatalLevel,
logrus.PanicLevel,
logrus.InfoLevel,
logrus.WarnLevel,
}
}

func (e *exporter) Fire(entry *logrus.Entry) error {
e.wg.Add(1)

go func(entry *logrus.Entry) {
defer e.wg.Done()
e.sendLogEvent(entry)
}(entry)

return nil
}

func (e *exporter) Wait() {
e.wg.Wait()
}

func (e *exporter) sendLogEvent(log *logrus.Entry) {
ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()

req := &castai.LogEvent{
Level: log.Level.String(),
Time: log.Time,
Message: log.Message,
Fields: log.Data,
}

b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3), ctx)
err := backoff.Retry(func() error {
return e.client.SendLogs(ctx, req)
}, b)

if err != nil {
e.logger.Debugf("sending logs: %v", err)
}
}
42 changes: 42 additions & 0 deletions log/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package log

import (
"testing"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/castai/cluster-controller/castai/mock"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestLogExporter(t *testing.T) {
it := require.New(t)
logger, hook := test.NewNullLogger()
defer hook.Reset()

client := mock.NewMockAPIClient(nil)
e := NewExporter(nil, client)
logger.AddHook(e)
log := logger.WithFields(logrus.Fields{
"cluster_id": "test-cluster",
})

log.Log(logrus.InfoLevel, "deleting kubernetes node")
log.Log(logrus.ErrorLevel, "failed to add node")
log.Log(logrus.DebugLevel, "sending logs")
e.Wait()

it.Len(client.Logs, 2)
it.Equal(client.Logs[0].Message, "deleting kubernetes node")
it.Equal(client.Logs[0].Level, "info")
it.Equal(client.Logs[0].Fields, logrus.Fields{"cluster_id": "test-cluster"})
it.Equal(client.Logs[1].Message, "failed to add node")
it.Equal(client.Logs[1].Level, "error")
it.Equal(client.Logs[1].Fields, logrus.Fields{"cluster_id": "test-cluster"})
}
Loading

0 comments on commit 233bf0e

Please sign in to comment.