Skip to content

Commit

Permalink
feat(fuse): use common events store pkg (#339)
Browse files Browse the repository at this point in the history
c.f., #336

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho authored Feb 3, 2025
1 parent a0ec223 commit 76eed9b
Show file tree
Hide file tree
Showing 10 changed files with 436 additions and 797 deletions.
106 changes: 32 additions & 74 deletions components/fuse/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,53 @@ import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/leptonai/gpud/components"
"github.com/leptonai/gpud/components/common"
events_db "github.com/leptonai/gpud/components/db"
fuse_id "github.com/leptonai/gpud/components/fuse/id"
"github.com/leptonai/gpud/components/fuse/metrics"
"github.com/leptonai/gpud/components/fuse/state"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/log"

"github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func New(ctx context.Context, cfg Config) components.Component {
func New(ctx context.Context, cfg Config) (components.Component, error) {
eventsStore, err := events_db.NewStore(
cfg.Query.State.DBRW,
cfg.Query.State.DBRO,
events_db.CreateDefaultTableName(fuse_id.Name),
events_db.DefaultRetention,
)
if err != nil {
return nil, err
}

cfg.Query.SetDefaultsIfNotSet()
setDefaultPoller(cfg)
setDefaultPoller(cfg, eventsStore)

cctx, ccancel := context.WithCancel(ctx)
getDefaultPoller().Start(cctx, cfg.Query, fuse_id.Name)

return &component{
cfg: cfg,
rootCtx: ctx,
cancel: ccancel,
poller: getDefaultPoller(),
}
cfg: cfg,
ctx: cctx,
cancel: ccancel,
poller: getDefaultPoller(),
eventsStore: eventsStore,
}, nil
}

var _ components.Component = (*component)(nil)

type component struct {
cfg Config
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
gatherer prometheus.Gatherer
cfg Config
ctx context.Context
cancel context.CancelFunc
poller query.Poller
eventsStore events_db.Store
gatherer prometheus.Gatherer
}

func (c *component) Name() string { return fuse_id.Name }
Expand Down Expand Up @@ -86,61 +93,8 @@ func (c *component) States(ctx context.Context) ([]components.State, error) {
}, nil
}

const (
EventNameFuseConnections = "fuse_connections"

EventKeyUnixSeconds = "unix_seconds"
EventKeyData = "data"
EventKeyEncoding = "encoding"
EventValueEncodingJSON = "json"
)

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
events, err := state.ReadEvents(ctx, c.cfg.Query.State.DBRO, state.WithSince(since))
if err != nil {
return nil, err
}

if len(events) == 0 {
log.Logger.Debugw("no event found", "component", c.Name(), "since", humanize.Time(since))
return nil, nil
}

log.Logger.Debugw("found events", "component", c.Name(), "since", humanize.Time(since), "count", len(events))
convertedEvents := make([]components.Event, 0, len(events))
for _, event := range events {
msgs := []string{}
if event.CongestedPercentAgainstThreshold > c.cfg.CongestedPercentAgainstThreshold {
msgs = append(msgs, fmt.Sprintf("congested percent against threshold %.2f exceeds threshold %.2f", event.CongestedPercentAgainstThreshold, c.cfg.CongestedPercentAgainstThreshold))
}
if event.MaxBackgroundPercentAgainstThreshold > c.cfg.MaxBackgroundPercentAgainstThreshold {
msgs = append(msgs, fmt.Sprintf("max background percent against threshold %.2f exceeds threshold %.2f", event.MaxBackgroundPercentAgainstThreshold, c.cfg.MaxBackgroundPercentAgainstThreshold))
}
if len(msgs) == 0 {
continue
}

eb, err := event.JSON()
if err != nil {
continue
}

convertedEvents = append(convertedEvents, components.Event{
Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0).UTC()},
Name: EventNameFuseConnections,
Type: common.EventTypeCritical,
Message: strings.Join(msgs, ", "),
ExtraInfo: map[string]string{
EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10),
EventKeyData: string(eb),
EventKeyEncoding: EventValueEncodingJSON,
},
})
}
if len(convertedEvents) == 0 {
return nil, nil
}
return convertedEvents, nil
return c.eventsStore.Get(ctx, since)
}

func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) {
Expand Down Expand Up @@ -168,9 +122,13 @@ func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.
func (c *component) Close() error {
log.Logger.Debugw("closing component")

c.cancel()

// safe to call stop multiple times
c.poller.Stop(fuse_id.Name)

c.eventsStore.Close()

return nil
}

Expand Down
57 changes: 41 additions & 16 deletions components/fuse/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package fuse

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/leptonai/gpud/components"
"github.com/leptonai/gpud/components/common"
events_db "github.com/leptonai/gpud/components/db"
fuse_id "github.com/leptonai/gpud/components/fuse/id"
"github.com/leptonai/gpud/components/fuse/metrics"
"github.com/leptonai/gpud/components/fuse/state"
components_metrics "github.com/leptonai/gpud/components/metrics"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/pkg/fuse"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Output struct {
Expand All @@ -23,12 +28,12 @@ var (
)

// only set once since it relies on the kube client and specific port
func setDefaultPoller(cfg Config) {
func setDefaultPoller(cfg Config, eventsStore events_db.Store) {
defaultPollerOnce.Do(func() {
defaultPoller = query.New(
fuse_id.Name,
cfg.Query,
CreateGet(cfg),
CreateGet(cfg, eventsStore),
nil,
)
})
Expand All @@ -38,7 +43,7 @@ func getDefaultPoller() query.Poller {
return defaultPoller
}

func CreateGet(cfg Config) query.GetFunc {
func CreateGet(cfg Config, eventsStore events_db.Store) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
defer func() {
if e != nil {
Expand All @@ -64,27 +69,47 @@ func CreateGet(cfg Config) query.GetFunc {
continue
}

prev, err := state.FindEvent(ctx, cfg.Query.State.DBRO, now.Unix(), info.DeviceName)
if err != nil {
if err := metrics.SetConnectionsCongestedPercent(ctx, info.DeviceName, info.CongestedPercent, now); err != nil {
return nil, err
}
if err := metrics.SetConnectionsMaxBackgroundPercent(ctx, info.DeviceName, info.MaxBackgroundPercent, now); err != nil {
return nil, err
}
if prev == nil {

msgs := []string{}
if info.CongestedPercent > cfg.CongestedPercentAgainstThreshold {
msgs = append(msgs, fmt.Sprintf("congested percent against threshold %.2f exceeds threshold %.2f", info.CongestedPercent, cfg.CongestedPercentAgainstThreshold))
}
if info.MaxBackgroundPercent > cfg.MaxBackgroundPercentAgainstThreshold {
msgs = append(msgs, fmt.Sprintf("max background percent against threshold %.2f exceeds threshold %.2f", info.MaxBackgroundPercent, cfg.MaxBackgroundPercentAgainstThreshold))
}
if len(msgs) == 0 {
continue
}

if err := state.InsertEvent(ctx, cfg.Query.State.DBRW, state.Event{
UnixSeconds: now.Unix(),
DeviceName: info.DeviceName,
CongestedPercentAgainstThreshold: info.CongestedPercent,
MaxBackgroundPercentAgainstThreshold: info.MaxBackgroundPercent,
}); err != nil {
return nil, err
ib, err := info.JSON()
if err != nil {
continue
}
ev := components.Event{
Time: metav1.Time{Time: now.UTC()},
Name: "fuse_connections",
Type: common.EventTypeCritical,
Message: info.DeviceName + ": " + strings.Join(msgs, ", "),
ExtraInfo: map[string]string{
"data": string(ib),
"encoding": "json",
},
}

if err := metrics.SetConnectionsCongestedPercent(ctx, info.DeviceName, info.CongestedPercent, now); err != nil {
found, err := eventsStore.Find(ctx, ev)
if err != nil {
return nil, err
}
if err := metrics.SetConnectionsMaxBackgroundPercent(ctx, info.DeviceName, info.MaxBackgroundPercent, now); err != nil {
if found == nil {
continue
}
if err := eventsStore.Insert(ctx, ev); err != nil {
return nil, err
}

Expand Down
125 changes: 125 additions & 0 deletions components/fuse/component_output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package fuse

import (
"context"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/leptonai/gpud/components/common"
events_db "github.com/leptonai/gpud/components/db"
"github.com/leptonai/gpud/pkg/sqlite"
)

func TestGet(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("skipping on non-linux")
}

dbRW, dbRO, cleanup := sqlite.OpenTestDB(t)
defer cleanup()

eventsStore, err := events_db.NewStore(dbRW, dbRO, "test", 0)
if err != nil {
t.Fatalf("failed to create events store: %v", err)
}
defer eventsStore.Close()

getFunc := CreateGet(Config{
CongestedPercentAgainstThreshold: 90,
MaxBackgroundPercentAgainstThreshold: 90,
}, eventsStore)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err = getFunc(ctx)
if err != nil {
t.Fatalf("failed to get events: %v", err)
}
}

func TestCreateGetWithThresholds(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("skipping on non-linux")
}

dbRW, dbRO, cleanup := sqlite.OpenTestDB(t)
defer cleanup()

eventsStore, err := events_db.NewStore(dbRW, dbRO, "test", 0)
require.NoError(t, err)
defer eventsStore.Close()

// Test with low thresholds to trigger events
cfg := Config{
CongestedPercentAgainstThreshold: 10, // Low threshold to trigger event
MaxBackgroundPercentAgainstThreshold: 10, // Low threshold to trigger event
}

getFunc := CreateGet(cfg, eventsStore)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

result, err := getFunc(ctx)
require.NoError(t, err)

output, ok := result.(*Output)
require.True(t, ok)
assert.NotNil(t, output)
assert.NotNil(t, output.ConnectionInfos)

// Check if events were created for exceeded thresholds
events, err := eventsStore.Get(ctx, time.Now().Add(-1*time.Hour))
require.NoError(t, err)

for _, event := range events {
assert.Equal(t, "fuse_connections", event.Name)
assert.Equal(t, common.EventTypeCritical, event.Type)
assert.Contains(t, event.Message, "percent against threshold")
assert.NotEmpty(t, event.ExtraInfo["data"])
assert.Equal(t, "json", event.ExtraInfo["encoding"])
}
}

func TestCreateGetDeduplication(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("skipping on non-linux")
}

dbRW, dbRO, cleanup := sqlite.OpenTestDB(t)
defer cleanup()

eventsStore, err := events_db.NewStore(dbRW, dbRO, "test", 0)
require.NoError(t, err)
defer eventsStore.Close()

cfg := Config{
CongestedPercentAgainstThreshold: 90,
MaxBackgroundPercentAgainstThreshold: 90,
}

getFunc := CreateGet(cfg, eventsStore)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

result, err := getFunc(ctx)
require.NoError(t, err)

output, ok := result.(*Output)
require.True(t, ok)

// Check for duplicate device names
seenDevices := make(map[string]bool)
for _, info := range output.ConnectionInfos {
if seenDevices[info.DeviceName] {
t.Errorf("Duplicate device name found: %s", info.DeviceName)
}
seenDevices[info.DeviceName] = true
}
}
Loading

0 comments on commit 76eed9b

Please sign in to comment.