Skip to content

Commit

Permalink
feat: initial event sink implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jsiebens committed Feb 11, 2024
1 parent 41b64ee commit a219349
Show file tree
Hide file tree
Showing 12 changed files with 498 additions and 3 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.15.0 // indirect
github.com/containerd/continuity v0.4.3 // indirect
github.com/coreos/go-iptables v0.7.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
Expand Down Expand Up @@ -132,6 +133,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect
github.com/jsimonetti/rtnetlink v1.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
Expand All @@ -145,6 +147,8 @@ require (
github.com/miekg/dns v1.1.57 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc6 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/cilium/ebpf v0.12.3 h1:8ht6F9MquybnY97at+VDZb3eQQr8ev79RueWeVaEcG4=
github.com/cilium/ebpf v0.12.3/go.mod h1:TctK1ivibvI3znr66ljgi4hqOT8EYQjz1KWBfb1UVgM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/v2 v2.12.0 h1:p1k+ysVOZtNiXfijnwB3WqZNA3y2cGOiKQygWkUHCEI=
github.com/cloudevents/sdk-go/v2 v2.12.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/cloudevents/sdk-go/v2 v2.15.0 h1:aKnhLQhyoJXqEECQdOIZnbZ9VupqlidE6hedugDGr+I=
github.com/cloudevents/sdk-go/v2 v2.15.0/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -559,6 +563,7 @@ github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4os
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down Expand Up @@ -640,9 +645,11 @@ github.com/mitchellh/pointerstructure v1.2.1/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8oh
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
Expand Down
34 changes: 34 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ func defaultConfig() *Config {
Logging: Logging{
Level: "info",
},
Events: Events{
Log: EventsLogSink{
Enabled: false,
},
File: EventsFileSink{
Enabled: false,
FileName: "events.log",
},
},
}
}

Expand All @@ -146,6 +155,7 @@ type Config struct {
Auth Auth `yaml:"auth,omitempty" envPrefix:"AUTH_"`
DNS DNS `yaml:"dns,omitempty"`
Logging Logging `yaml:"logging,omitempty" envPrefix:"LOGGING_"`
Events Events `yaml:"events,omitempty" envPrefix:"EVENTS_"`
}

type Tls struct {
Expand All @@ -168,6 +178,30 @@ type Logging struct {
File string `yaml:"file,omitempty" env:"FILE"`
}

type Events struct {
Log EventsLogSink `yaml:"log,omitempty" envPrefix:"LOG_"`
File EventsFileSink `yaml:"file,omitempty" envPrefix:"FILE_"`
Tcp EventsTcpSink `yaml:"tcp,omitempty" envPrefix:"TCP_"`
}

type EventsLogSink struct {
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
}

type EventsFileSink struct {
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
Path string `yaml:"path,omitempty" env:"PATH"`
FileName string `yaml:"name,omitempty" env:"NAME"`
MaxBytes int `yaml:"max_bytes,omitempty" env:"MAX_BYTES"`
MaxDuration time.Duration `yaml:"max_duration,omitempty" env:"MAX_DURATION"`
MaxFiles int `yaml:"max_files,omitempty" env:"MAX_FILES"`
}

type EventsTcpSink struct {
Enabled bool `yaml:"enabled,omitempty" env:"ENABLED"`
Addr string `yaml:"addr,omitempty" env:"ADDR"`
}

type Database struct {
Type string `yaml:"type,omitempty" env:"TYPE"`
Url string `yaml:"url,omitempty" env:"URL"`
Expand Down
81 changes: 81 additions & 0 deletions internal/eventlog/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package eventlog

import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/jsiebens/ionscale/internal/domain"
"math/big"
)

const (
tailnetCreated = "ionscale.tailnet.created"
tailnetDeleted = "ionscale.tailnet.deleted"
nodeCreated = "ionscale.node.created"
)

func TailnetCreated(tailnet *domain.Tailnet, actor *domain.User) cloudevents.Event {
data := &EventData{
Tailnet: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Target: &Target{ID: idToStr(tailnet.ID), Name: tailnet.Name},
Actor: system,
}

if actor != nil {
data.Actor = Actor{ID: idToStr(actor.ID), Name: actor.Name}
}

event := cloudevents.NewEvent()
event.SetType(tailnetCreated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

func MachineCreated(machine *domain.Machine, actor *domain.User) cloudevents.Event {
data := &EventData{
Tailnet: &Target{ID: idToStr(machine.Tailnet.ID), Name: machine.Tailnet.Name},
Target: &Target{ID: idToStr(machine.ID), Name: machine.CompleteName(), Addresses: machine.IPs()},
Actor: UserToActor(actor),
}

event := cloudevents.NewEvent()
event.SetType(nodeCreated)
_ = event.SetData(cloudevents.ApplicationJSON, data)

return event
}

func UserToActor(actor *domain.User) Actor {
if actor == nil {
return system
}

switch actor.UserType {
case domain.UserTypePerson:
return Actor{ID: idToStr(actor.ID), Name: actor.Name}
default:
return system
}
}

type EventData struct {
Tailnet *Target `json:"tailnet,omitempty"`
Target *Target `json:"target,omitempty"`
Actor Actor `json:"actor"`
}

type Target struct {
ID string `json:"id"`
Name string `json:"name"`
Addresses []string `json:"addresses,omitempty"`
}

type Actor struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
}

func idToStr(id uint64) string {
return big.NewInt(int64(id)).Text(10)
}

var system = Actor{ID: "", Name: "ionscale system"}
133 changes: 133 additions & 0 deletions internal/eventlog/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package eventlog

import (
"bytes"
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/hashicorp/go-multierror"
"github.com/jsiebens/ionscale/internal/config"
"github.com/jsiebens/ionscale/internal/util"
"go.uber.org/zap"
"io"
"os"
"path/filepath"
"sync"
"time"
)

const (
stdout = "/dev/stdout"
stderr = "/dev/stderr"
devnull = "/dev/null"
)

type Events []cloudevents.Event

func (e *Events) Add(event cloudevents.Event) {
x := append(*e, event)
*e = x
}

type Eventer interface {
Send(ctx context.Context, events ...cloudevents.Event) error
}

type eventer struct {
source string
sinks []sink
}

func (e *eventer) Send(ctx context.Context, events ...cloudevents.Event) error {
groupID := util.NextIDString()
now := time.Now()

for _, event := range events {
event.SetSource(e.source)
event.SetID(util.NextIDString())
event.SetTime(now)
event.SetExtension("eventGroupID", groupID)
}

var r *multierror.Error
for _, s := range e.sinks {
r = multierror.Append(r, s.process(ctx, events...))
}

return r.ErrorOrNil()
}

type sink interface {
process(context.Context, ...cloudevents.Event) error
}

var (
_globalMu sync.RWMutex
_globalE Eventer = &eventer{}
)

func Configure(c *config.Config) error {
var sinks []sink

if c.Events.Log.Enabled {
sinks = append(sinks, &zapSink{logger: zap.L().Named("events").WithOptions(zap.AddCallerSkip(3))})
}

if c.Events.File.Enabled {
switch c.Events.File.Path {
case devnull:
// ignore
case stderr:
sinks = append(sinks, &writerSink{w: os.Stderr})
case stdout:
sinks = append(sinks, &writerSink{w: os.Stdout})
default:
abs, err := filepath.Abs(c.Events.File.Path)
if err != nil {
return err
}

sinks = append(sinks, &fileSink{
path: abs,
fileName: c.Events.File.FileName,
maxBytes: c.Events.File.MaxBytes,
maxDuration: c.Events.File.MaxDuration,
maxFiles: c.Events.File.MaxFiles,
})
}
}

_globalMu.Lock()
defer _globalMu.Unlock()
_globalE = &eventer{
source: c.ServerUrl,
sinks: sinks,
}

return nil
}

func Send(ctx context.Context, events ...cloudevents.Event) {
_globalMu.RLock()
l := _globalE
_globalMu.RUnlock()

if err := l.Send(ctx, events...); err != nil {
zap.L().Error("error while processing event", zap.Error(err))
}
}

func writeJSONLine(w io.Writer, events ...cloudevents.Event) (int, error) {
var payload bytes.Buffer

for _, event := range events {
eventJson, err := event.MarshalJSON()
if err != nil {
return 0, err
}

payload.Write(eventJson)
payload.Write([]byte("\n"))
}

return w.Write(payload.Bytes())
}
Loading

0 comments on commit a219349

Please sign in to comment.