diff --git a/components/accelerator/nvidia/bad-envs/component.go b/components/accelerator/nvidia/bad-envs/component.go index 316ddb29..3a2f6f26 100644 --- a/components/accelerator/nvidia/bad-envs/component.go +++ b/components/accelerator/nvidia/bad-envs/component.go @@ -18,14 +18,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, bad_envs_id.Name) return &component{ diff --git a/components/accelerator/nvidia/clock-speed/component.go b/components/accelerator/nvidia/clock-speed/component.go index 08e1c82d..ab9b697b 100644 --- a/components/accelerator/nvidia/clock-speed/component.go +++ b/components/accelerator/nvidia/clock-speed/component.go @@ -22,14 +22,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_clock_speed_id.Name) return &component{ diff --git a/components/accelerator/nvidia/ecc/component.go b/components/accelerator/nvidia/ecc/component.go index 85445f11..3022f6bb 100644 --- a/components/accelerator/nvidia/ecc/component.go +++ b/components/accelerator/nvidia/ecc/component.go @@ -22,14 +22,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_ecc_id.Name) return &component{ diff --git a/components/accelerator/nvidia/error-xid-sxid/component.go b/components/accelerator/nvidia/error-xid-sxid/component.go index 16db976a..1dd50ff0 100644 --- a/components/accelerator/nvidia/error-xid-sxid/component.go +++ b/components/accelerator/nvidia/error-xid-sxid/component.go @@ -26,14 +26,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { // this starts the Xid poller via "nvml.StartDefaultInstance" cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_error_xid_sxid_id.Name) return &component{ diff --git a/components/accelerator/nvidia/error/component.go b/components/accelerator/nvidia/error/component.go index dce7f56a..eefe5ee2 100644 --- a/components/accelerator/nvidia/error/component.go +++ b/components/accelerator/nvidia/error/component.go @@ -19,14 +19,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/fabric-manager/component.go b/components/accelerator/nvidia/fabric-manager/component.go index d0e9f5b4..6aa6facb 100644 --- a/components/accelerator/nvidia/fabric-manager/component.go +++ b/components/accelerator/nvidia/fabric-manager/component.go @@ -22,15 +22,9 @@ func New(ctx context.Context, cfg Config) (components.Component, error) { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Log.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Log.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) - nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) + if nvidia_query.GetDefaultPoller() != nil { + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) + } if err := cfg.Log.Validate(); err != nil { ccancel() diff --git a/components/accelerator/nvidia/gsp-firmware-mode/component.go b/components/accelerator/nvidia/gsp-firmware-mode/component.go index ec33b966..eb48a47e 100644 --- a/components/accelerator/nvidia/gsp-firmware-mode/component.go +++ b/components/accelerator/nvidia/gsp-firmware-mode/component.go @@ -18,14 +18,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_gsp_firmware_mode_id.Name) return &component{ diff --git a/components/accelerator/nvidia/hw-slowdown/component.go b/components/accelerator/nvidia/hw-slowdown/component.go index 569eba95..4251f722 100644 --- a/components/accelerator/nvidia/hw-slowdown/component.go +++ b/components/accelerator/nvidia/hw-slowdown/component.go @@ -5,22 +5,20 @@ import ( "context" "database/sql" "fmt" - "strings" "time" "github.com/leptonai/gpud/components" nvidia_common "github.com/leptonai/gpud/components/accelerator/nvidia/common" nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query_metrics_clock "github.com/leptonai/gpud/components/accelerator/nvidia/query/metrics/clock" "github.com/leptonai/gpud/components/common" + events_db "github.com/leptonai/gpud/components/db" "github.com/leptonai/gpud/components/query" "github.com/leptonai/gpud/log" "github.com/dustin/go-humanize" "github.com/prometheus/client_golang/prometheus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -33,18 +31,10 @@ const ( DefaultStateHWSlowdownEventsThresholdFrequencyPerMinute = 0.6 ) -func New(ctx context.Context, cfg nvidia_common.Config) components.Component { +func New(ctx context.Context, cfg nvidia_common.Config, eventsStore events_db.Store) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_hw_slowdown_id.Name) return &component{ @@ -55,21 +45,7 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cancel: ccancel, poller: nvidia_query.GetDefaultPoller(), - readEvents: func(ctx context.Context, since time.Time) ([]nvidia_hw_slowdown_state.Event, error) { - // the default nvidia poller persists the events to the storage - // so we can just read from the storage - return nvidia_hw_slowdown_state.ReadEvents( - ctx, - cfg.Query.State.DBRO, - - nvidia_hw_slowdown_state.WithSince(since), - - // in order to dedup nvidia-smi events and prioritize nvml events - // otherwise, we have deduplicate objects from nvml and nvidia-smi - // deprecate this once we removed nvidia-smi dependency - nvidia_hw_slowdown_state.WithDedupDataSource(true), - ) - }, + eventsStore: eventsStore, } } @@ -84,7 +60,7 @@ type component struct { poller query.Poller gatherer prometheus.Gatherer - readEvents func(ctx context.Context, since time.Time) ([]nvidia_hw_slowdown_state.Event, error) + eventsStore events_db.Store } func (c *component) Name() string { return nvidia_hw_slowdown_id.Name } @@ -108,7 +84,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { since := time.Now().UTC().Add(-c.stateHWSlowdownEvaluationWindow) - events, err := c.readEvents(ctx, since) + events, err := c.eventsStore.Get(ctx, since) if err != nil { return nil, err } @@ -125,7 +101,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { eventsByMinute := make(map[int]struct{}) for _, event := range events { - min := int(event.Timestamp / 60) // unix seconds to minutes + min := int(event.Time.Unix() / 60) // unix seconds to minutes eventsByMinute[min] = struct{}{} } @@ -167,30 +143,7 @@ const ( ) func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) { - events, err := c.readEvents(ctx, since) - if err != nil { - return nil, err - } - - if len(events) == 0 { - log.Logger.Debugw("no event found for /events", "component", c.Name(), "since", humanize.Time(since)) - return nil, nil - } - - log.Logger.Debugw("found events", "component", c.Name(), "since", humanize.Time(since), "count", len(events)) - convertedEvents := make([]components.Event, 0, len(events)) - for _, event := range events { - convertedEvents = append(convertedEvents, components.Event{ - Time: metav1.Time{Time: time.Unix(event.Timestamp, 0).UTC()}, - Name: EventNameHWSlowdown, - Type: common.EventTypeWarning, - Message: strings.Join(event.Reasons, ", "), - ExtraInfo: map[string]string{ - EventKeyGPUUUID: event.GPUUUID, - }, - }) - } - return convertedEvents, nil + return c.eventsStore.Get(ctx, since) } func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) { diff --git a/components/accelerator/nvidia/hw-slowdown/component_test.go b/components/accelerator/nvidia/hw-slowdown/component_test.go index 234cab79..3e5969a8 100644 --- a/components/accelerator/nvidia/hw-slowdown/component_test.go +++ b/components/accelerator/nvidia/hw-slowdown/component_test.go @@ -5,8 +5,15 @@ import ( "testing" "time" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + "github.com/leptonai/gpud/components/db" + events_db "github.com/leptonai/gpud/components/db" "github.com/leptonai/gpud/pkg/sqlite" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestComponentStates(t *testing.T) { @@ -18,7 +25,7 @@ func TestComponentStates(t *testing.T) { name string window time.Duration thresholdPerMinute float64 - insertedEvent []nvidia_hw_slowdown_state.Event + insertedEvent []components.Event expectedStates int expectHealthy bool }{ @@ -26,18 +33,52 @@ func TestComponentStates(t *testing.T) { name: "single event within window", window: 10 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{{Timestamp: now.Add(-5 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}}, - expectedStates: 1, - expectHealthy: true, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-5 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + }, + expectedStates: 1, + expectHealthy: true, }, { name: "multiple events within window but below threshold", window: 10 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-5 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-1"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-2"}}, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-5 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, }, expectedStates: 1, expectHealthy: true, @@ -46,11 +87,43 @@ func TestComponentStates(t *testing.T) { name: "events above threshold", window: 5 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-1"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-2"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-3"}}, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, }, expectedStates: 1, expectHealthy: false, @@ -59,26 +132,155 @@ func TestComponentStates(t *testing.T) { name: "events above threshold with multiple GPUs", window: 5 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, - - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, - - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, - - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, + insertedEvent: []components.Event{ + // GPU 0-3 events at -4 minutes + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-3", + }, + }, + // GPU 0-3 events at -3 minutes + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-3", + }, + }, + // GPU 0-3 events at -2 minutes + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-3", + }, + }, + // GPU 0-3 events at -1 minutes + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-3", + }, + }, }, expectedStates: 1, expectHealthy: false, @@ -87,9 +289,25 @@ func TestComponentStates(t *testing.T) { name: "events outside window", window: 5 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-10 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}, - {Timestamp: now.Add(-8 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-1"}}, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-10 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-8 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + }, }, expectedStates: 1, expectHealthy: true, @@ -98,7 +316,7 @@ func TestComponentStates(t *testing.T) { name: "no events", window: 10 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{}, + insertedEvent: []components.Event{}, expectedStates: 1, expectHealthy: true, }, @@ -113,42 +331,152 @@ func TestComponentStates(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - if err := nvidia_hw_slowdown_state.CreateTable(ctx, dbRW); err != nil { - t.Fatalf("failed to create table: %v", err) - } + store, err := db.NewStore(dbRW, dbRO, "test_events", 0) + assert.NoError(t, err) + defer store.Close() if len(tc.insertedEvent) > 0 { for _, event := range tc.insertedEvent { - if err := nvidia_hw_slowdown_state.InsertEvent(ctx, dbRW, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } + err := store.Insert(ctx, event) + assert.NoError(t, err) } } c := &component{ stateHWSlowdownEvaluationWindow: tc.window, stateHWSlowdownEventsThresholdFrequencyPerMinute: tc.thresholdPerMinute, - readEvents: func(ctx context.Context, since time.Time) ([]nvidia_hw_slowdown_state.Event, error) { - return nvidia_hw_slowdown_state.ReadEvents( - ctx, - dbRO, - nvidia_hw_slowdown_state.WithSince(now.Add(-tc.window)), - nvidia_hw_slowdown_state.WithDedupDataSource(true), - ) - }, + eventsStore: store, } states, err := c.States(ctx) - if err != nil { - t.Fatalf("failed to get states: %v", err) + assert.NoError(t, err) + assert.Equal(t, tc.expectedStates, len(states)) + + if len(states) > 0 { + assert.Equal(t, tc.expectHealthy, states[0].Healthy) + } + }) + } +} + +func TestComponentRegisterCollectors(t *testing.T) { + t.Parallel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + reg := prometheus.NewRegistry() + c := &component{} + + err := c.RegisterCollectors(reg, dbRW, dbRO, "test_metrics") + assert.NoError(t, err) + assert.Equal(t, reg, c.gatherer) +} + +func TestComponentStatesEdgeCases(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + window time.Duration + thresholdPerMinute float64 + setupStore func(store events_db.Store, ctx context.Context) error + expectError bool + expectedStates int + expectHealthy bool + }{ + { + name: "zero evaluation window", + window: 0, + thresholdPerMinute: 0.6, + setupStore: func(store events_db.Store, ctx context.Context) error { return nil }, + expectError: false, + expectedStates: 1, + expectHealthy: true, + }, + { + name: "negative evaluation window", + window: -10 * time.Minute, + thresholdPerMinute: 0.6, + setupStore: func(store events_db.Store, ctx context.Context) error { return nil }, + expectError: false, + expectedStates: 1, + expectHealthy: true, + }, + { + name: "zero threshold", + window: 10 * time.Minute, + thresholdPerMinute: 0, + setupStore: func(store events_db.Store, ctx context.Context) error { + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC().Add(-5 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + } + return store.Insert(ctx, event) + }, + expectError: false, + expectedStates: 1, + expectHealthy: false, + }, + { + name: "negative threshold", + window: 10 * time.Minute, + thresholdPerMinute: -0.6, + setupStore: func(store events_db.Store, ctx context.Context) error { + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC().Add(-5 * time.Minute)}, + Name: EventNameHWSlowdown, + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + EventKeyGPUUUID: "gpu-0", + }, + } + return store.Insert(ctx, event) + }, + expectError: false, + expectedStates: 1, + expectHealthy: false, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := db.NewStore(dbRW, dbRO, "test_events", 0) + assert.NoError(t, err) + defer store.Close() + + err = tc.setupStore(store, ctx) + assert.NoError(t, err) + + c := &component{ + stateHWSlowdownEvaluationWindow: tc.window, + stateHWSlowdownEventsThresholdFrequencyPerMinute: tc.thresholdPerMinute, + eventsStore: store, } - if len(states) != tc.expectedStates { - t.Fatalf("expected %d states, got %d", tc.expectedStates, len(states)) + states, err := c.States(ctx) + if tc.expectError { + assert.Error(t, err) + return } - if len(states) > 0 && states[0].Healthy != tc.expectHealthy { - t.Errorf("expected healthy=%v, got %v", tc.expectHealthy, states[0].Healthy) + assert.NoError(t, err) + assert.Equal(t, tc.expectedStates, len(states)) + if len(states) > 0 { + assert.Equal(t, tc.expectHealthy, states[0].Healthy) } }) } diff --git a/components/accelerator/nvidia/hw-slowdown/state/options.go b/components/accelerator/nvidia/hw-slowdown/state/options.go deleted file mode 100644 index 6802266e..00000000 --- a/components/accelerator/nvidia/hw-slowdown/state/options.go +++ /dev/null @@ -1,77 +0,0 @@ -package state - -import ( - "errors" - "time" -) - -var ErrInvalidLimit = errors.New("limit must be greater than or equal to 0") - -type Op struct { - sinceUnixSeconds int64 - beforeUnixSeconds int64 - sortUnixSecondsAscOrder bool - limit int - dedupDataSource bool -} - -type OpOption func(*Op) - -func (op *Op) applyOpts(opts []OpOption) error { - for _, opt := range opts { - opt(op) - } - - if op.limit < 0 { - return ErrInvalidLimit - } - - return nil -} - -// WithSince sets the since timestamp for the select queries. -// If not specified, it returns all events. -func WithSince(t time.Time) OpOption { - return func(op *Op) { - op.sinceUnixSeconds = t.Unix() - } -} - -// WithBefore sets the before timestamp for the delete queries. -// If not specified, it deletes all events. -func WithBefore(t time.Time) OpOption { - return func(op *Op) { - op.beforeUnixSeconds = t.Unix() - } -} - -// WithSortUnixSecondsAscendingOrder sorts the events by unix_seconds in ascending order, -// meaning its read query returns the oldest events first. -func WithSortUnixSecondsAscendingOrder() OpOption { - return func(op *Op) { - op.sortUnixSecondsAscOrder = true - } -} - -// WithSortUnixSecondsDescendingOrder sorts the events by unix_seconds in descending order, -// meaning its read query returns the newest events first. -func WithSortUnixSecondsDescendingOrder() OpOption { - return func(op *Op) { - op.sortUnixSecondsAscOrder = false - } -} - -func WithLimit(limit int) OpOption { - return func(op *Op) { - op.limit = limit - } -} - -// Set true to deduplicate events by data sources. -// Meaning, out of nvml and nvidia-smi, if there are events at the same time, -// only the one from nvml will be kept. -func WithDedupDataSource(dedup bool) OpOption { - return func(op *Op) { - op.dedupDataSource = dedup - } -} diff --git a/components/accelerator/nvidia/hw-slowdown/state/state.go b/components/accelerator/nvidia/hw-slowdown/state/state.go deleted file mode 100644 index 327fc473..00000000 --- a/components/accelerator/nvidia/hw-slowdown/state/state.go +++ /dev/null @@ -1,324 +0,0 @@ -// Package state provides the persistent storage layer for the hw slowdown events. -package state - -import ( - "context" - "database/sql" - "encoding/json" - "errors" - "fmt" - "reflect" - "sort" - "time" - - "github.com/leptonai/gpud/log" - "github.com/leptonai/gpud/pkg/sqlite" - - _ "github.com/mattn/go-sqlite3" -) - -const ( - DeprecatedTableNameClockEvents = "components_accelerator_nvidia_query_clock_events" - TableNameHWSlowdown = "components_accelerator_nvidia_hw_slowdown_events" -) - -const ( - // unix timestamp in seconds when the event was observed - ColumnUnixSeconds = "unix_seconds" - - // either "nvml" or "dmesg" - ColumnDataSource = "data_source" - - // gpu uuid - ColumnGPUUUID = "gpu_uuid" - - // reasons for clock events - ColumnReasons = "reasons" -) - -// retain up to 3 days of events -const DefaultRetentionPeriod = 3 * 24 * time.Hour - -type Event struct { - Timestamp int64 - DataSource string - GPUUUID string - Reasons []string -} - -func CreateTable(ctx context.Context, db *sql.DB) error { - _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", DeprecatedTableNameClockEvents)) - if err != nil { - return err - } - - _, err = db.ExecContext(ctx, fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s ( - %s INTEGER NOT NULL, - %s TEXT NOT NULL, - %s TEXT NOT NULL, - %s TEXT NOT NULL -);`, TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - )) - return err -} - -func InsertEvent(ctx context.Context, db *sql.DB, event Event) error { - log.Logger.Debugw("inserting event", "dataSource", event.DataSource, "uuid", event.GPUUUID, "reasons", event.Reasons) - - insertStatement := fmt.Sprintf(` -INSERT OR REPLACE INTO %s (%s, %s, %s, %s) VALUES (?, ?, ?, NULLIF(?, '')); -`, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - ) - - reasonsBytes, err := json.Marshal(event.Reasons) - if err != nil { - return err - } - - start := time.Now() - _, err = db.ExecContext( - ctx, - insertStatement, - event.Timestamp, - event.DataSource, - event.GPUUUID, - string(reasonsBytes), - ) - sqlite.RecordInsertUpdate(time.Since(start).Seconds()) - - return err -} - -func FindEvent(ctx context.Context, db *sql.DB, event Event) (bool, error) { - selectStatement := fmt.Sprintf(` -SELECT %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ? AND %s = ?; -`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ) - - start := time.Now() - var foundEvent Event - var reasonsRaw string - err := db.QueryRowContext( - ctx, - selectStatement, - event.Timestamp, - event.DataSource, - event.GPUUUID, - ).Scan( - &foundEvent.Timestamp, - &foundEvent.DataSource, - &foundEvent.GPUUUID, - &reasonsRaw, - ) - sqlite.RecordSelect(time.Since(start).Seconds()) - - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return false, nil - } - return false, err - } - if err := json.Unmarshal([]byte(reasonsRaw), &foundEvent.Reasons); err != nil { - return false, err - } - - sort.Strings(foundEvent.Reasons) - sort.Strings(event.Reasons) - - // event at the same time but with different details - if len(foundEvent.Reasons) > 0 && !reflect.DeepEqual(foundEvent.Reasons, event.Reasons) { - return false, nil - } - - // found event - // e.g., same messages in dmesg - return true, nil -} - -// Returns nil if no event is found. -func ReadEvents(ctx context.Context, db *sql.DB, opts ...OpOption) ([]Event, error) { - selectStatement, args, err := createSelectStatementAndArgs(opts...) - if err != nil { - return nil, err - } - - rows, err := db.QueryContext(ctx, selectStatement, args...) - if err != nil { - return nil, err - } - defer rows.Close() - - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return nil, err - } - - // data sources can be multiple (e.g., nvml and nvidia-smi) - // here, we prioritize nvml over nvidia-smi - // for the same unix second, only the one from nvml will be kept - unixToUUIDToDataSource := make(map[int64]map[string]string, 0) - - events := []Event{} - for rows.Next() { - var event Event - var reasonsRaw string - if err := rows.Scan( - &event.Timestamp, - &event.DataSource, - &event.GPUUUID, - &reasonsRaw, - ); err != nil { - return nil, err - } - - _, ok := unixToUUIDToDataSource[event.Timestamp] - if !ok { - unixToUUIDToDataSource[event.Timestamp] = make(map[string]string, 0) - } - - prevDataSource, ok := unixToUUIDToDataSource[event.Timestamp][event.GPUUUID] - currDataSource := event.DataSource - - toInclude := true - if !ok { - // this GPU had NO PREVIOUS event at this unix second ("nvml" or "nvidia-smi") - unixToUUIDToDataSource[event.Timestamp][event.GPUUUID] = currDataSource - } else { - // this GPU HAD A PREVIOUS event at this unix second BUT with different data source - - // this works because we prioritize "nvml" over "nvidia-smi" in the SQL select statement - // "ORDER BY data_source DESC" where "nvml" comes before "nvidia-smi" - if op.dedupDataSource && prevDataSource == "nvml" { - // already had an event with the prioritized "nvml" data source - // thus we DO NOT need to return another event with the other data source ("nvidia-smi") - toInclude = false - } - } - - if !toInclude { - continue - } - - if err := json.Unmarshal([]byte(reasonsRaw), &event.Reasons); err != nil { - return nil, err - } - events = append(events, event) - } - if err := rows.Err(); err != nil { - return nil, err - } - if len(events) == 0 { - return nil, nil - } - - return events, nil -} - -func createSelectStatementAndArgs(opts ...OpOption) (string, []any, error) { - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return "", nil, err - } - - selectStatement := fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ) - - args := []any{} - - if op.sinceUnixSeconds > 0 { - selectStatement += "\nWHERE " - selectStatement += fmt.Sprintf("%s >= ?", ColumnUnixSeconds) - args = append(args, op.sinceUnixSeconds) - } - - // sort by unix seconds and data source - // data source is sorted in reverse order so that "nvml" returns before "nvidia-smi" - // for the same unix second and event type - selectStatement += "\nORDER BY " + ColumnUnixSeconds - if op.sortUnixSecondsAscOrder { - selectStatement += " ASC" - } else { - selectStatement += " DESC" - } - selectStatement += ", " + ColumnDataSource + " DESC" - - if op.limit > 0 { - selectStatement += fmt.Sprintf("\nLIMIT %d", op.limit) - } - - if len(args) == 0 { - return selectStatement, nil, nil - } - return selectStatement, args, nil -} - -func Purge(ctx context.Context, db *sql.DB, opts ...OpOption) (int, error) { - log.Logger.Debugw("purging nvidia clock events") - deleteStatement, args, err := createDeleteStatementAndArgs(opts...) - if err != nil { - return 0, err - } - - start := time.Now() - rs, err := db.ExecContext(ctx, deleteStatement, args...) - sqlite.RecordDelete(time.Since(start).Seconds()) - - if err != nil { - return 0, err - } - - affected, err := rs.RowsAffected() - if err != nil { - return 0, err - } - return int(affected), nil -} - -// ignores order by and limit -func createDeleteStatementAndArgs(opts ...OpOption) (string, []any, error) { - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return "", nil, err - } - - deleteStatement := fmt.Sprintf(`DELETE FROM %s`, - TableNameHWSlowdown, - ) - - args := []any{} - - if op.beforeUnixSeconds > 0 { - deleteStatement += " WHERE " - deleteStatement += fmt.Sprintf("%s < ?", ColumnUnixSeconds) - args = append(args, op.beforeUnixSeconds) - } - - if len(args) == 0 { - return deleteStatement, nil, nil - } - return deleteStatement, args, nil -} diff --git a/components/accelerator/nvidia/hw-slowdown/state/state_test.go b/components/accelerator/nvidia/hw-slowdown/state/state_test.go deleted file mode 100644 index f1ab524f..00000000 --- a/components/accelerator/nvidia/hw-slowdown/state/state_test.go +++ /dev/null @@ -1,767 +0,0 @@ -package state - -import ( - "context" - "database/sql" - "fmt" - "os" - "reflect" - "testing" - "time" - - "github.com/leptonai/gpud/pkg/sqlite" -) - -func TestCreateSelectStatement(t *testing.T) { - tests := []struct { - name string - opts []OpOption - want string - wantArgs []any - wantErr bool - }{ - { - name: "no options", - opts: nil, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s DESC, %s DESC`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with since unix seconds", - opts: []OpOption{WithSince(time.Unix(1234, 0))}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -WHERE %s >= ? -ORDER BY %s DESC, %s DESC`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - { - name: "with ascending order", - opts: []OpOption{WithSortUnixSecondsAscendingOrder()}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s ASC, %s DESC`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with limit", - opts: []OpOption{WithLimit(10)}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s DESC, %s DESC -LIMIT 10`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with all options", - opts: []OpOption{ - WithSince(time.Unix(1234, 0)), - WithSortUnixSecondsAscendingOrder(), - WithLimit(10), - }, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -WHERE %s >= ? -ORDER BY %s ASC, %s DESC -LIMIT 10`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, args, err := createSelectStatementAndArgs(tt.opts...) - if (err != nil) != tt.wantErr { - t.Errorf("createSelectStatement() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("createSelectStatement() = %v, want %v", got, tt.want) - } - if !reflect.DeepEqual(args, tt.wantArgs) { - t.Errorf("createSelectStatement() args = %v, want %v", args, tt.wantArgs) - } - }) - } -} - -func TestOpenMemory(t *testing.T) { - t.Parallel() - - db, err := sqlite.Open(":memory:") - if err != nil { - t.Fatalf("failed to open database: %v", err) - } - defer db.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := CreateTable(ctx, db); err != nil { - t.Fatal("failed to create table:", err) - } -} - -func TestInsertAndFindEvent(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - event := Event{ - Timestamp: time.Now().Unix(), - DataSource: "nvml", - GPUUUID: "31", - Reasons: []string{"GPU has fallen off the bus"}, - } - - // Test insertion - err := InsertEvent(ctx, db, event) - if err != nil { - t.Errorf("InsertEvent failed: %v", err) - } - - // Test finding the event - found, err := FindEvent(ctx, db, event) - if err != nil { - t.Errorf("FindEvent failed: %v", err) - } - if !found { - t.Error("expected to find event, but it wasn't found") - } - - // Test finding event with different details - eventDiffDetails := event - eventDiffDetails.Reasons = []string{"Different details"} - found, err = FindEvent(ctx, db, eventDiffDetails) - if err != nil { - t.Errorf("FindEvent with different details failed: %v", err) - } - if found { - t.Error("expected not to find event with different details") - } -} - -func TestReadEvents_NoRows(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // test ReadEvents with empty table - events, err := ReadEvents(ctx, db) - if err != nil { - t.Errorf("expected no error for empty table, got: %v", err) - } - if events != nil { - t.Errorf("expected nil events for empty table, got: %v", events) - } -} - -func TestReadEvents(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - baseTime := time.Now().Unix() - - // Insert test events - testEvents := []Event{ - { - Timestamp: baseTime, - DataSource: "nvml", - GPUUUID: "31", - Reasons: []string{"First event"}, - }, - { - Timestamp: baseTime + 1, - DataSource: "nvidia-smi", - GPUUUID: "32", - Reasons: []string{"Second event"}, - }, - { - Timestamp: baseTime + 2, - DataSource: "nvml", - GPUUUID: "33", - Reasons: []string{"Third event"}, - }, - } - - for _, event := range testEvents { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert test event: %v", err) - } - } - - // test reading all events - events, err := ReadEvents(ctx, db) - if err != nil { - t.Errorf("ReadEvents failed: %v", err) - } - if len(events) != len(testEvents) { - t.Errorf("expected %d events, got %d", len(testEvents), len(events)) - } - - // test reading events with limit - events, err = ReadEvents(ctx, db, WithLimit(2)) - if err != nil { - t.Errorf("ReadEvents with limit failed: %v", err) - } - if len(events) != 2 { - t.Errorf("expected 2 events with limit, got %d", len(events)) - } - - // test reading events since specific time - events, err = ReadEvents(ctx, db, WithSince(time.Unix(baseTime+1, 0))) - if err != nil { - t.Errorf("ReadEvents with since time failed: %v", err) - } - - t.Logf("searching for events since: %d", baseTime+1) - for _, e := range events { - t.Logf("Found event with timestamp: %d", e.Timestamp) - } - - if len(events) != 2 { - t.Errorf("expected 2 events since baseTime+1, got %d", len(events)) - } - - // Test reading events with ascending order - events, err = ReadEvents(ctx, db, WithSortUnixSecondsAscendingOrder()) - if err != nil { - t.Errorf("ReadEvents with ascending order failed: %v", err) - } - if len(events) != 3 || events[0].Timestamp > events[len(events)-1].Timestamp { - t.Error("Events not properly ordered in ascending order") - } -} - -func TestCreateDeleteStatementAndArgs(t *testing.T) { - tests := []struct { - name string - opts []OpOption - wantStatement string - wantArgs []any - wantErr bool - }{ - { - name: "no options", - opts: []OpOption{}, - wantStatement: fmt.Sprintf("DELETE FROM %s", - TableNameHWSlowdown, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with before unix seconds and limit", - opts: []OpOption{ - WithBefore(time.Unix(1234, 0)), - WithLimit(10), - }, - wantStatement: fmt.Sprintf("DELETE FROM %s WHERE %s < ?", - TableNameHWSlowdown, - ColumnUnixSeconds, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotStatement, gotArgs, err := createDeleteStatementAndArgs(tt.opts...) - - if tt.wantErr { - if err == nil { - t.Error("createDeleteStatementAndArgs() error = nil, wantErr = true") - } - return - } - - if err != nil { - t.Errorf("createDeleteStatementAndArgs() error = %v, wantErr = false", err) - return - } - - if gotStatement != tt.wantStatement { - t.Errorf("createDeleteStatementAndArgs() statement = %v, want %v", gotStatement, tt.wantStatement) - } - - if !reflect.DeepEqual(gotArgs, tt.wantArgs) { - t.Errorf("createDeleteStatementAndArgs() args = %v, want %v", gotArgs, tt.wantArgs) - } - }) - } -} - -func TestPurge(t *testing.T) { - t.Parallel() - - now := time.Now() - - tests := []struct { - name string - setup []Event - purgeOpts []OpOption - wantErr bool - wantPurged int - wantCount int - validate func(*testing.T, []Event) - }{ - { - name: "delete events before timestamp", - setup: []Event{ - {Timestamp: 1000, DataSource: "nvml", GPUUUID: "1", Reasons: []string{"detail1"}}, - {Timestamp: 2000, DataSource: "nvml", GPUUUID: "2", Reasons: []string{"detail2"}}, - {Timestamp: 3000, DataSource: "nvml", GPUUUID: "3", Reasons: []string{"detail3"}}, - }, - purgeOpts: []OpOption{WithBefore(time.Unix(2500, 0))}, - wantPurged: 2, - wantCount: 1, - validate: func(t *testing.T, events []Event) { - if len(events) == 0 || events[0].Timestamp != 3000 { - t.Errorf("expected event with timestamp 3000, got %+v", events) - } - }, - }, - { - name: "delete all events", - setup: []Event{ - {Timestamp: 1000, DataSource: "nvml", GPUUUID: "1", Reasons: []string{"detail1"}}, - {Timestamp: 2000, DataSource: "nvml", GPUUUID: "2", Reasons: []string{"detail2"}}, - }, - purgeOpts: []OpOption{}, - wantPurged: 2, - wantCount: 0, - }, - { - name: "delete events with large dataset", - setup: func() []Event { - events := make([]Event, 100) - baseTime := now.Unix() - for i := 0; i < 100; i++ { - events[i] = Event{ - Timestamp: baseTime + int64(i*60), // Events 1 minute apart - DataSource: "nvml", - GPUUUID: fmt.Sprintf("%d", i+1), - Reasons: []string{fmt.Sprintf("detail%d", i+1)}, - } - } - return events - }(), - purgeOpts: []OpOption{WithBefore(now.Add(30 * time.Minute))}, - wantPurged: 30, - wantCount: 70, - validate: func(t *testing.T, events []Event) { - if len(events) != 70 { - t.Errorf("expected 70 events, got %d", len(events)) - } - cutoff := now.Add(30 * time.Minute).Unix() - for _, e := range events { - if e.Timestamp < cutoff { - t.Errorf("found event with timestamp %d, which is before cutoff %d", - e.Timestamp, cutoff) - } - } - }, - }, - } - - for _, tt := range tests { - tt := tt // capture range variable - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // setup fresh database for each test - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Insert test data - for _, event := range tt.setup { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert test event: %v", err) - } - } - - // perform deletion - purged, err := Purge(ctx, db, tt.purgeOpts...) - if (err != nil) != tt.wantErr { - t.Errorf("DeleteEvents() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if purged != tt.wantPurged { - t.Errorf("DeleteEvents() purged = %v, want %v", purged, tt.wantPurged) - } - - // verify results - events, err := ReadEvents(ctx, db) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - if len(events) != tt.wantCount { - t.Errorf("expected %d events, got %d", tt.wantCount, len(events)) - } - - if tt.validate != nil { - tt.validate(t, events) - } - }) - } -} - -func setupTestDB(t *testing.T) (*sql.DB, func()) { - tmpfile, err := os.CreateTemp("", "test-nvidia-*.db") - if err != nil { - t.Fatalf("failed to create temp file: %v", err) - } - tmpfile.Close() - - db, err := sqlite.Open(tmpfile.Name()) - if err != nil { - os.Remove(tmpfile.Name()) - t.Fatalf("failed to open database: %v", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := CreateTable(ctx, db); err != nil { - db.Close() - os.Remove(tmpfile.Name()) - t.Fatalf("failed to create table: %v", err) - } - - cleanup := func() { - db.Close() - os.Remove(tmpfile.Name()) - } - return db, cleanup -} - -func TestDedupEvents(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Test case 1: Basic deduplication with nvml and nvidia-smi - // place nvidia-smi first but read will return nvml first - events := []Event{ - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup enabled - readEvents, err := ReadEvents(ctx, db, WithDedupDataSource(true)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should only get one event (the nvml one) - if len(readEvents) != 1 { - t.Errorf("expected 1 event after dedup, got %d", len(readEvents)) - } - - if len(readEvents) > 0 { - got := readEvents[0] - want := events[1] // The nvml event - if got.DataSource != want.DataSource { - t.Errorf("expected data source %s, got %s", want.DataSource, got.DataSource) - } - if !reflect.DeepEqual(got.Reasons, want.Reasons) { - t.Errorf("expected reasons %v, got %v", want.Reasons, got.Reasons) - } - } - - // Test case 2: Multiple events with different timestamps - events = []Event{ - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1001, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail2"}, - }, - } - - // Clear the table - if _, err := db.ExecContext(ctx, "DELETE FROM "+TableNameHWSlowdown); err != nil { - t.Fatalf("failed to clear table: %v", err) - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup enabled - readEvents, err = ReadEvents(ctx, db, WithDedupDataSource(true)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get both events since they have different timestamps - if len(readEvents) != 2 { - t.Errorf("expected 2 events for different timestamps, got %d", len(readEvents)) - } - - // Test case 3: Multiple GPUs at same timestamp - events = []Event{ - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "2", - Reasons: []string{"detail2"}, - }, - } - - // Clear the table - if _, err := db.ExecContext(ctx, "DELETE FROM "+TableNameHWSlowdown); err != nil { - t.Fatalf("failed to clear table: %v", err) - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup enabled - readEvents, err = ReadEvents(ctx, db, WithDedupDataSource(true)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get both events since they are for different GPUs - if len(readEvents) != 2 { - t.Errorf("expected 2 events for different GPUs, got %d", len(readEvents)) - } -} - -func TestDataSourceOrdering(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Test case 1: Same timestamp, same GPU - verify nvml comes before nvidia-smi - events := []Event{ - // Intentionally insert nvidia-smi first to verify ordering is by data source - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup disabled to verify ordering - readEvents, err := ReadEvents(ctx, db, WithDedupDataSource(false)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get both events with nvml first - if len(readEvents) != 2 { - t.Errorf("expected 2 events without dedup, got %d", len(readEvents)) - } - - if len(readEvents) == 2 { - // First event should be nvml - if readEvents[0].DataSource != "nvml" { - t.Errorf("expected first event to be from nvml, got %s", readEvents[0].DataSource) - } - // Second event should be nvidia-smi - if readEvents[1].DataSource != "nvidia-smi" { - t.Errorf("expected second event to be from nvidia-smi, got %s", readEvents[1].DataSource) - } - } - - // Test case 2: Multiple events with mixed timestamps - verify ordering within same timestamp - events = []Event{ - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1001, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail2"}, - }, - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1001, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail2"}, - }, - } - - // Clear the table - if _, err := db.ExecContext(ctx, "DELETE FROM "+TableNameHWSlowdown); err != nil { - t.Fatalf("failed to clear table: %v", err) - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup disabled - readEvents, err = ReadEvents(ctx, db, WithDedupDataSource(false)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get all 4 events - if len(readEvents) != 4 { - t.Errorf("expected 4 events without dedup, got %d", len(readEvents)) - } - - if len(readEvents) == 4 { - // For each unique timestamp, nvml should come before nvidia-smi - // Events should be ordered by timestamp DESC, then data source DESC - expectedOrder := []struct { - unixSeconds int64 - dataSource string - }{ - {1001, "nvml"}, - {1001, "nvidia-smi"}, - {1000, "nvml"}, - {1000, "nvidia-smi"}, - } - - for i, expected := range expectedOrder { - got := readEvents[i] - if got.Timestamp != expected.unixSeconds || got.DataSource != expected.dataSource { - t.Errorf("event[%d]: expected {unix: %d, source: %s}, got {unix: %d, source: %s}", - i, expected.unixSeconds, expected.dataSource, got.Timestamp, got.DataSource) - } - } - } -} diff --git a/components/accelerator/nvidia/infiniband/component.go b/components/accelerator/nvidia/infiniband/component.go index feab8e10..7535c2f7 100644 --- a/components/accelerator/nvidia/infiniband/component.go +++ b/components/accelerator/nvidia/infiniband/component.go @@ -18,14 +18,6 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_infiniband_id.Name) return &component{ diff --git a/components/accelerator/nvidia/info/component.go b/components/accelerator/nvidia/info/component.go index ed5388dd..cad53cca 100644 --- a/components/accelerator/nvidia/info/component.go +++ b/components/accelerator/nvidia/info/component.go @@ -19,14 +19,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/memory/component.go b/components/accelerator/nvidia/memory/component.go index 60ce999b..d80802d5 100644 --- a/components/accelerator/nvidia/memory/component.go +++ b/components/accelerator/nvidia/memory/component.go @@ -23,14 +23,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/nccl/component.go b/components/accelerator/nvidia/nccl/component.go index 6fac3669..b079c266 100644 --- a/components/accelerator/nvidia/nccl/component.go +++ b/components/accelerator/nvidia/nccl/component.go @@ -22,14 +22,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_nccl_id.Name) return &component{ diff --git a/components/accelerator/nvidia/nvlink/component.go b/components/accelerator/nvidia/nvlink/component.go index 931a260b..dbade037 100644 --- a/components/accelerator/nvidia/nvlink/component.go +++ b/components/accelerator/nvidia/nvlink/component.go @@ -23,14 +23,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/peermem/component.go b/components/accelerator/nvidia/peermem/component.go index 7b7294f6..019ed281 100644 --- a/components/accelerator/nvidia/peermem/component.go +++ b/components/accelerator/nvidia/peermem/component.go @@ -22,14 +22,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_peermem_id.Name) return &component{ diff --git a/components/accelerator/nvidia/persistence-mode/component.go b/components/accelerator/nvidia/persistence-mode/component.go index d68965ee..d15e4b5a 100644 --- a/components/accelerator/nvidia/persistence-mode/component.go +++ b/components/accelerator/nvidia/persistence-mode/component.go @@ -18,14 +18,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_persistence_mode_id.Name) return &component{ diff --git a/components/accelerator/nvidia/power/component.go b/components/accelerator/nvidia/power/component.go index 11261afe..fee1419c 100644 --- a/components/accelerator/nvidia/power/component.go +++ b/components/accelerator/nvidia/power/component.go @@ -22,14 +22,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_power_id.Name) return &component{ diff --git a/components/accelerator/nvidia/processes/component.go b/components/accelerator/nvidia/processes/component.go index 75f21d39..766af3ea 100644 --- a/components/accelerator/nvidia/processes/component.go +++ b/components/accelerator/nvidia/processes/component.go @@ -23,14 +23,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/query/nvidia_smi_query.go b/components/accelerator/nvidia/query/nvidia_smi_query.go index 837a4d62..85dadcc4 100644 --- a/components/accelerator/nvidia/query/nvidia_smi_query.go +++ b/components/accelerator/nvidia/query/nvidia_smi_query.go @@ -10,12 +10,15 @@ import ( "sort" "strings" "sync" + "time" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" "github.com/leptonai/gpud/log" "github.com/leptonai/gpud/pkg/file" "github.com/leptonai/gpud/pkg/process" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" ) @@ -461,8 +464,8 @@ func (o *SMIOutput) FindHWSlowdownErrs() []string { return errs } -func (o *SMIOutput) HWSlowdownEvents(unixSeconds int64) []nvidia_hw_slowdown_state.Event { - var events []nvidia_hw_slowdown_state.Event +func (o *SMIOutput) HWSlowdownEvents(unixSeconds int64) []components.Event { + var events []components.Event for _, g := range o.GPUs { if g.ClockEventReasons == nil { continue @@ -474,11 +477,15 @@ func (o *SMIOutput) HWSlowdownEvents(unixSeconds int64) []nvidia_hw_slowdown_sta } sort.Strings(hwSlowdownErrs) - events = append(events, nvidia_hw_slowdown_state.Event{ - Timestamp: unixSeconds, - DataSource: "nvidia-smi", - GPUUUID: g.ID, - Reasons: hwSlowdownErrs, + events = append(events, components.Event{ + Time: metav1.Time{Time: time.Unix(unixSeconds, 0).UTC()}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: strings.Join(hwSlowdownErrs, ", "), + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": g.ID, + }, }) } return events diff --git a/components/accelerator/nvidia/query/nvml/nvml.go b/components/accelerator/nvidia/query/nvml/nvml.go index a3839ad1..163555f7 100644 --- a/components/accelerator/nvidia/query/nvml/nvml.go +++ b/components/accelerator/nvidia/query/nvml/nvml.go @@ -4,10 +4,10 @@ package nvml import ( "context" - "database/sql" "errors" "fmt" "sort" + "strings" "sync" "time" @@ -16,8 +16,9 @@ import ( "github.com/NVIDIA/go-nvml/pkg/nvml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" - nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + events_db "github.com/leptonai/gpud/components/db" mocknvml "github.com/leptonai/gpud/e2e/mock/nvml" "github.com/leptonai/gpud/log" ) @@ -83,10 +84,8 @@ type instance struct { // maps from uuid to device info devices map[string]*DeviceInfo - // writable database instance - dbRW *sql.DB - // read-only database instance - dbRO *sql.DB + xidEventsStore events_db.Store + hwslowdownEventsStore events_db.Store clockEventsSupported bool @@ -254,8 +253,8 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { nvmlExists: nvmlExists, nvmlExistsMsg: nvmlExistsMsg, - dbRW: op.dbRW, - dbRO: op.dbRO, + xidEventsStore: op.xidEventsStore, + hwslowdownEventsStore: op.hwslowdownEventsStore, clockEventsSupported: clockEventsSupported, @@ -287,13 +286,6 @@ func (inst *instance) Start() error { inst.mu.Lock() defer inst.mu.Unlock() - log.Logger.Debugw("creating xid sxid event history table") - ctx, cancel := context.WithTimeout(inst.rootCtx, 10*time.Second) - defer cancel() - if err := nvidia_xid_sxid_state.CreateTableXidSXidEventHistory(ctx, inst.dbRW); err != nil { - return err - } - // "NVIDIA Xid 79: GPU has fallen off the bus" may fail this syscall with: // "error getting device handle for index '6': Unknown Error" log.Logger.Debugw("getting devices from device library") @@ -504,24 +496,28 @@ func (inst *instance) Get() (*Output, error) { latestInfo.ClockEvents = &clockEvents - ev := nvidia_hw_slowdown_state.Event{ - Timestamp: clockEvents.Time.Unix(), - DataSource: "nvml", - GPUUUID: devInfo.UUID, - Reasons: clockEvents.HWSlowdownReasons, + ev := components.Event{ + Time: clockEvents.Time, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: strings.Join(clockEvents.HWSlowdownReasons, ", "), + ExtraInfo: map[string]string{ + "data_source": "nvml", + "gpu_uuid": devInfo.UUID, + }, } cctx, ccancel := context.WithTimeout(context.Background(), 15*time.Second) - found, err := nvidia_hw_slowdown_state.FindEvent(cctx, inst.dbRO, ev) + found, err := inst.hwslowdownEventsStore.Find(cctx, ev) ccancel() if err != nil { log.Logger.Warnw("failed to find clock events from db", "error", err, "gpu_uuid", devInfo.UUID) joinedErrs = append(joinedErrs, fmt.Errorf("failed to find clock events: %w (GPU uuid %s)", err, devInfo.UUID)) - } else if !found { + } else if found != nil { log.Logger.Warnw("detected hw slowdown clock events", "hwSlowdownReasons", clockEvents.HWSlowdownReasons) cctx, ccancel = context.WithTimeout(context.Background(), 15*time.Second) - err = nvidia_hw_slowdown_state.InsertEvent(cctx, inst.dbRW, ev) + err = inst.hwslowdownEventsStore.Insert(cctx, ev) ccancel() if err != nil { log.Logger.Warnw("failed to insert clock events to db", "error", err, "gpu_uuid", devInfo.UUID) diff --git a/components/accelerator/nvidia/query/nvml/options.go b/components/accelerator/nvidia/query/nvml/options.go index 58e5a8e0..dd08623b 100644 --- a/components/accelerator/nvidia/query/nvml/options.go +++ b/components/accelerator/nvidia/query/nvml/options.go @@ -1,16 +1,15 @@ package nvml import ( - "database/sql" - "github.com/NVIDIA/go-nvml/pkg/nvml" - "github.com/leptonai/gpud/pkg/sqlite" + + events_db "github.com/leptonai/gpud/components/db" ) type Op struct { - dbRW *sql.DB - dbRO *sql.DB - gpmMetricsIDs map[nvml.GpmMetricId]struct{} + xidEventsStore events_db.Store + hwslowdownEventsStore events_db.Store + gpmMetricsIDs map[nvml.GpmMetricId]struct{} } type OpOption func(*Op) @@ -19,37 +18,18 @@ func (op *Op) applyOpts(opts []OpOption) error { for _, opt := range opts { opt(op) } - if op.dbRW == nil { - var err error - op.dbRW, err = sqlite.Open(":memory:") - if err != nil { - return err - } - } - if op.dbRO == nil { - var err error - op.dbRO, err = sqlite.Open(":memory:", sqlite.WithReadOnly(true)) - if err != nil { - return err - } - } return nil } -// Specifies the database instance to persist nvidia components data -// (e.g., xid/sxid events). Must be a writable database instance. -// If not specified, a new in-memory database is created. -func WithDBRW(db *sql.DB) OpOption { +func WithXidEventsStore(store events_db.Store) OpOption { return func(op *Op) { - op.dbRW = db + op.xidEventsStore = store } } -// Specifies the read-only database instance. -// If not specified, a new in-memory database is created. -func WithDBRO(db *sql.DB) OpOption { +func WithHWSlowdownEventsStore(store events_db.Store) OpOption { return func(op *Op) { - op.dbRO = db + op.hwslowdownEventsStore = store } } diff --git a/components/accelerator/nvidia/query/nvml/xid.go b/components/accelerator/nvidia/query/nvml/xid.go index 5ac649db..08e16877 100644 --- a/components/accelerator/nvidia/query/nvml/xid.go +++ b/components/accelerator/nvidia/query/nvml/xid.go @@ -6,8 +6,9 @@ import ( "fmt" "time" + "github.com/leptonai/gpud/components" nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid" - nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" + "github.com/leptonai/gpud/components/common" "github.com/leptonai/gpud/log" "github.com/NVIDIA/go-nvml/pkg/nvml" @@ -132,10 +133,10 @@ func (inst *instance) pollXidEvents() { continue } - msg := "received event with a known xid" + msg := fmt.Sprintf("received event with a known xid: %d", xid) xidDetail, ok := nvidia_query_xid.GetDetail(int(xid)) if !ok { - msg = "received event but xid unknown" + msg = fmt.Sprintf("received event but xid unknown: %d", xid) } log.Logger.Warnw("detected xid event", "xid", xid, "message", msg) @@ -148,8 +149,21 @@ func (inst *instance) pollXidEvents() { deviceUUIDErr = fmt.Errorf("failed to get device UUID: %v", nvml.ErrorString(ret)) } - event := &XidEvent{ - Time: metav1.Time{Time: time.Now().UTC()}, + now := time.Now().UTC() + + event := components.Event{ + Time: metav1.Time{Time: now}, + Name: "error_xid", + Type: common.EventTypeUnknown, + Message: msg, + } + if xidDetail != nil { + event.Type = xidDetail.EventType + event.SuggestedActions = xidDetail.SuggestedActionsByGPUd + } + + xidEvent := &XidEvent{ + Time: metav1.Time{Time: now}, SampleDuration: metav1.Duration{Duration: 5 * time.Second}, DeviceUUID: deviceUUID, @@ -170,32 +184,27 @@ func (inst *instance) pollXidEvents() { Error: deviceUUIDErr, } - eb, err := event.JSON() + eb, err := xidEvent.JSON() if err != nil { log.Logger.Errorw("failed to marshal xid event", "error", err) continue } + event.ExtraInfo = map[string]string{ + "xid_event": string(eb), + } - // no need to check duplicate entries, assuming nvml event poller does not return old events ctx, cancel := context.WithTimeout(inst.rootCtx, 10*time.Second) - werr := nvidia_xid_sxid_state.InsertEvent(ctx, inst.dbRW, nvidia_xid_sxid_state.Event{ - UnixSeconds: event.Time.Unix(), - DataSource: "nvml", - EventType: "xid", - EventID: int64(event.Xid), - DeviceID: event.DeviceUUID, - EventDetails: string(eb), - }) + werr := inst.xidEventsStore.Insert(ctx, event) cancel() if werr != nil { - log.Logger.Errorw("failed to insert xid event into database", "error", werr) + log.Logger.Errorw("failed to insert xid event into events store", "error", werr) } select { case <-inst.rootCtx.Done(): return - case inst.xidEventCh <- event: - log.Logger.Warnw("notified xid event", "event", event) + case inst.xidEventCh <- xidEvent: + log.Logger.Warnw("notified xid event", "event", xidEvent) default: log.Logger.Warnw("xid event channel is full, skipping event") } diff --git a/components/accelerator/nvidia/query/options.go b/components/accelerator/nvidia/query/options.go index 3f5d4edf..24d6485b 100644 --- a/components/accelerator/nvidia/query/options.go +++ b/components/accelerator/nvidia/query/options.go @@ -1,10 +1,12 @@ package query -import "database/sql" +import ( + events_db "github.com/leptonai/gpud/components/db" +) type Op struct { - dbRW *sql.DB - dbRO *sql.DB + xidEventsStore events_db.Store + hwslowdownEventsStore events_db.Store nvidiaSMICommand string nvidiaSMIQueryCommand string ibstatCommand string @@ -35,15 +37,15 @@ func (op *Op) applyOpts(opts []OpOption) error { return nil } -func WithDBRW(db *sql.DB) OpOption { +func WithXidEventsStore(store events_db.Store) OpOption { return func(op *Op) { - op.dbRW = db + op.xidEventsStore = store } } -func WithDBRO(db *sql.DB) OpOption { +func WithHWSlowdownEventsStore(store events_db.Store) OpOption { return func(op *Op) { - op.dbRO = db + op.hwslowdownEventsStore = store } } diff --git a/components/accelerator/nvidia/query/query.go b/components/accelerator/nvidia/query/query.go index af841ff4..99a321bf 100644 --- a/components/accelerator/nvidia/query/query.go +++ b/components/accelerator/nvidia/query/query.go @@ -10,7 +10,6 @@ import ( "sync" "time" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" "github.com/leptonai/gpud/components/accelerator/nvidia/query/infiniband" metrics_clock "github.com/leptonai/gpud/components/accelerator/nvidia/query/metrics/clock" metrics_clockspeed "github.com/leptonai/gpud/components/accelerator/nvidia/query/metrics/clock-speed" @@ -89,8 +88,8 @@ func Get(ctx context.Context, opts ...OpOption) (output any, err error) { if err := nvml.StartDefaultInstance( ctx, - nvml.WithDBRW(op.dbRW), - nvml.WithDBRO(op.dbRO), + nvml.WithXidEventsStore(op.xidEventsStore), + nvml.WithHWSlowdownEventsStore(op.hwslowdownEventsStore), nvml.WithGPMMetricsID( go_nvml.GPM_METRIC_SM_OCCUPANCY, go_nvml.GPM_METRIC_INTEGER_UTIL, @@ -282,23 +281,23 @@ func Get(ctx context.Context, opts ...OpOption) (output any, err error) { events := o.SMI.HWSlowdownEvents(truncNowUTC.Unix()) for _, event := range events { cctx, ccancel = context.WithTimeout(ctx, time.Minute) - found, err := nvidia_hw_slowdown_state.FindEvent(cctx, op.dbRO, event) + found, err := op.hwslowdownEventsStore.Find(cctx, event) ccancel() if err != nil { - log.Logger.Warnw("failed to find clock events from db", "error", err, "gpu_uuid", event.GPUUUID) + log.Logger.Warnw("failed to find clock events from db", "error", err, "info", event.ExtraInfo) o.SMIQueryErrors = append(o.SMIQueryErrors, fmt.Sprintf("failed to find clock events: %v", err)) continue } - if found { + if found != nil { continue } - log.Logger.Warnw("detected hw slowdown clock events", "gpu_uuid", event.GPUUUID) + log.Logger.Warnw("detected hw slowdown clock events", "info", event.ExtraInfo) cctx, ccancel = context.WithTimeout(ctx, time.Minute) - err = nvidia_hw_slowdown_state.InsertEvent(cctx, op.dbRW, event) + err = op.hwslowdownEventsStore.Insert(cctx, event) ccancel() if err != nil { - log.Logger.Warnw("failed to insert clock events to db", "error", err, "gpu_uuid", event.GPUUUID) + log.Logger.Warnw("failed to insert clock events to db", "error", err, "info", event.ExtraInfo) o.SMIQueryErrors = append(o.SMIQueryErrors, fmt.Sprintf("failed to persist clock events: %v", err)) } } diff --git a/components/accelerator/nvidia/remapped-rows/component.go b/components/accelerator/nvidia/remapped-rows/component.go index 07c9e8f3..1f813f17 100644 --- a/components/accelerator/nvidia/remapped-rows/component.go +++ b/components/accelerator/nvidia/remapped-rows/component.go @@ -23,14 +23,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/temperature/component.go b/components/accelerator/nvidia/temperature/component.go index 4f84c92c..1872f346 100644 --- a/components/accelerator/nvidia/temperature/component.go +++ b/components/accelerator/nvidia/temperature/component.go @@ -23,14 +23,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/accelerator/nvidia/utilization/component.go b/components/accelerator/nvidia/utilization/component.go index 61eb695c..ae720f13 100644 --- a/components/accelerator/nvidia/utilization/component.go +++ b/components/accelerator/nvidia/utilization/component.go @@ -23,14 +23,6 @@ func New(ctx context.Context, cfg nvidia_common.Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.SetDefaultPoller( - nvidia_query.WithDBRW(cfg.Query.State.DBRW), - nvidia_query.WithDBRO(cfg.Query.State.DBRO), - nvidia_query.WithNvidiaSMICommand(cfg.NvidiaSMICommand), - nvidia_query.WithNvidiaSMIQueryCommand(cfg.NvidiaSMIQueryCommand), - nvidia_query.WithIbstatCommand(cfg.IbstatCommand), - nvidia_query.WithInfinibandClassDirectory(cfg.InfinibandClassDirectory), - ) nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ diff --git a/components/diagnose/scan.go b/components/diagnose/scan.go index 1e945392..e949716b 100644 --- a/components/diagnose/scan.go +++ b/components/diagnose/scan.go @@ -9,12 +9,14 @@ import ( "time" "github.com/dustin/go-humanize" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" + nvidia_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id" + nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query_nvml "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml" nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid" nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid" + events_db "github.com/leptonai/gpud/components/db" "github.com/leptonai/gpud/components/dmesg" query_log_common "github.com/leptonai/gpud/components/query/log/common" query_log_tail "github.com/leptonai/gpud/components/query/log/tail" @@ -124,17 +126,30 @@ func Scan(ctx context.Context, opts ...OpOption) error { } defer db.Close() - // "nvidia_query.Get" assumes that the "clock-events-state" table exists - // pre-create since this is a one-off operation - // TODO: move these into a single place - if err := nvidia_hw_slowdown_state.CreateTable(ctx, db); err != nil { - log.Logger.Fatalw("failed to create clock events state table", "error", err) + eventsStoreNvidiaErrorXid, err := events_db.NewStore( + db, + db, + events_db.CreateDefaultTableName(nvidia_error_xid_id.Name), + 3*24*time.Hour, + ) + if err != nil { + log.Logger.Fatalw("failed to create events store", "error", err) + } + + eventsStoreNvidiaHWSlowdown, err := events_db.NewStore( + db, + db, + events_db.CreateDefaultTableName(nvidia_hw_slowdown_id.Name), + 3*24*time.Hour, + ) + if err != nil { + log.Logger.Fatalw("failed to create events store", "error", err) } outputRaw, err := nvidia_query.Get( ctx, - nvidia_query.WithDBRW(db), - nvidia_query.WithDBRO(db), + nvidia_query.WithXidEventsStore(eventsStoreNvidiaErrorXid), + nvidia_query.WithHWSlowdownEventsStore(eventsStoreNvidiaHWSlowdown), nvidia_query.WithNvidiaSMICommand(op.nvidiaSMICommand), nvidia_query.WithNvidiaSMIQueryCommand(op.nvidiaSMIQueryCommand), nvidia_query.WithIbstatCommand(op.ibstatCommand), diff --git a/components/diagnose/scan_test.go b/components/diagnose/scan_test.go index d8b0fede..3f1fbbf5 100644 --- a/components/diagnose/scan_test.go +++ b/components/diagnose/scan_test.go @@ -10,7 +10,7 @@ func TestScan(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - if err := Scan(ctx); err != nil { + if err := Scan(ctx, WithDebug(true), WithNetcheck(false), WithDiskcheck(false)); err != nil { t.Fatalf("error scanning: %+v", err) } } diff --git a/components/fuse/component.go b/components/fuse/component.go index e9982c34..ec8abe96 100644 --- a/components/fuse/component.go +++ b/components/fuse/component.go @@ -5,46 +5,53 @@ import ( "context" "database/sql" "fmt" - "strconv" - "strings" "time" "github.com/leptonai/gpud/components" - "github.com/leptonai/gpud/components/common" + events_db "github.com/leptonai/gpud/components/db" fuse_id "github.com/leptonai/gpud/components/fuse/id" "github.com/leptonai/gpud/components/fuse/metrics" - "github.com/leptonai/gpud/components/fuse/state" "github.com/leptonai/gpud/components/query" "github.com/leptonai/gpud/log" - "github.com/dustin/go-humanize" "github.com/prometheus/client_golang/prometheus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func New(ctx context.Context, cfg Config) components.Component { +func New(ctx context.Context, cfg Config) (components.Component, error) { + eventsStore, err := events_db.NewStore( + cfg.Query.State.DBRW, + cfg.Query.State.DBRO, + events_db.CreateDefaultTableName(fuse_id.Name), + 3*24*time.Hour, + ) + if err != nil { + return nil, err + } + cfg.Query.SetDefaultsIfNotSet() - setDefaultPoller(cfg) + setDefaultPoller(cfg, eventsStore) cctx, ccancel := context.WithCancel(ctx) getDefaultPoller().Start(cctx, cfg.Query, fuse_id.Name) return &component{ - cfg: cfg, - rootCtx: ctx, - cancel: ccancel, - poller: getDefaultPoller(), - } + cfg: cfg, + rootCtx: ctx, + cancel: ccancel, + poller: getDefaultPoller(), + eventsStore: eventsStore, + }, nil } var _ components.Component = (*component)(nil) type component struct { - cfg Config - rootCtx context.Context - cancel context.CancelFunc - poller query.Poller - gatherer prometheus.Gatherer + cfg Config + rootCtx context.Context + cancel context.CancelFunc + poller query.Poller + eventsStore events_db.Store + gatherer prometheus.Gatherer } func (c *component) Name() string { return fuse_id.Name } @@ -96,51 +103,7 @@ const ( ) func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) { - events, err := state.ReadEvents(ctx, c.cfg.Query.State.DBRO, state.WithSince(since)) - if err != nil { - return nil, err - } - - if len(events) == 0 { - log.Logger.Debugw("no event found", "component", c.Name(), "since", humanize.Time(since)) - return nil, nil - } - - log.Logger.Debugw("found events", "component", c.Name(), "since", humanize.Time(since), "count", len(events)) - convertedEvents := make([]components.Event, 0, len(events)) - for _, event := range events { - msgs := []string{} - if event.CongestedPercentAgainstThreshold > c.cfg.CongestedPercentAgainstThreshold { - msgs = append(msgs, fmt.Sprintf("congested percent against threshold %.2f exceeds threshold %.2f", event.CongestedPercentAgainstThreshold, c.cfg.CongestedPercentAgainstThreshold)) - } - if event.MaxBackgroundPercentAgainstThreshold > c.cfg.MaxBackgroundPercentAgainstThreshold { - msgs = append(msgs, fmt.Sprintf("max background percent against threshold %.2f exceeds threshold %.2f", event.MaxBackgroundPercentAgainstThreshold, c.cfg.MaxBackgroundPercentAgainstThreshold)) - } - if len(msgs) == 0 { - continue - } - - eb, err := event.JSON() - if err != nil { - continue - } - - convertedEvents = append(convertedEvents, components.Event{ - Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0).UTC()}, - Name: EventNameFuseConnections, - Type: common.EventTypeCritical, - Message: strings.Join(msgs, ", "), - ExtraInfo: map[string]string{ - EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10), - EventKeyData: string(eb), - EventKeyEncoding: EventValueEncodingJSON, - }, - }) - } - if len(convertedEvents) == 0 { - return nil, nil - } - return convertedEvents, nil + return c.eventsStore.Get(ctx, since) } func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) { diff --git a/components/fuse/component_output.go b/components/fuse/component_output.go index 85a1384f..5c2139d7 100644 --- a/components/fuse/component_output.go +++ b/components/fuse/component_output.go @@ -2,15 +2,20 @@ package fuse import ( "context" + "fmt" + "strings" "sync" "time" + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + events_db "github.com/leptonai/gpud/components/db" fuse_id "github.com/leptonai/gpud/components/fuse/id" "github.com/leptonai/gpud/components/fuse/metrics" - "github.com/leptonai/gpud/components/fuse/state" components_metrics "github.com/leptonai/gpud/components/metrics" "github.com/leptonai/gpud/components/query" "github.com/leptonai/gpud/pkg/fuse" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type Output struct { @@ -23,12 +28,12 @@ var ( ) // only set once since it relies on the kube client and specific port -func setDefaultPoller(cfg Config) { +func setDefaultPoller(cfg Config, eventsStore events_db.Store) { defaultPollerOnce.Do(func() { defaultPoller = query.New( fuse_id.Name, cfg.Query, - CreateGet(cfg), + CreateGet(cfg, eventsStore), nil, ) }) @@ -38,7 +43,7 @@ func getDefaultPoller() query.Poller { return defaultPoller } -func CreateGet(cfg Config) query.GetFunc { +func CreateGet(cfg Config, eventsStore events_db.Store) query.GetFunc { return func(ctx context.Context) (_ any, e error) { defer func() { if e != nil { @@ -64,27 +69,47 @@ func CreateGet(cfg Config) query.GetFunc { continue } - prev, err := state.FindEvent(ctx, cfg.Query.State.DBRO, now.Unix(), info.DeviceName) - if err != nil { + if err := metrics.SetConnectionsCongestedPercent(ctx, info.DeviceName, info.CongestedPercent, now); err != nil { + return nil, err + } + if err := metrics.SetConnectionsMaxBackgroundPercent(ctx, info.DeviceName, info.MaxBackgroundPercent, now); err != nil { return nil, err } - if prev == nil { + + msgs := []string{} + if info.CongestedPercent > cfg.CongestedPercentAgainstThreshold { + msgs = append(msgs, fmt.Sprintf("congested percent against threshold %.2f exceeds threshold %.2f", info.CongestedPercent, cfg.CongestedPercentAgainstThreshold)) + } + if info.MaxBackgroundPercent > cfg.MaxBackgroundPercentAgainstThreshold { + msgs = append(msgs, fmt.Sprintf("max background percent against threshold %.2f exceeds threshold %.2f", info.MaxBackgroundPercent, cfg.MaxBackgroundPercentAgainstThreshold)) + } + if len(msgs) == 0 { continue } - if err := state.InsertEvent(ctx, cfg.Query.State.DBRW, state.Event{ - UnixSeconds: now.Unix(), - DeviceName: info.DeviceName, - CongestedPercentAgainstThreshold: info.CongestedPercent, - MaxBackgroundPercentAgainstThreshold: info.MaxBackgroundPercent, - }); err != nil { - return nil, err + ib, err := info.JSON() + if err != nil { + continue + } + ev := components.Event{ + Time: metav1.Time{Time: now.UTC()}, + Name: EventNameFuseConnections, + Type: common.EventTypeCritical, + Message: info.DeviceName + ": " + strings.Join(msgs, ", "), + ExtraInfo: map[string]string{ + "data": string(ib), + "encoding": "json", + }, } - if err := metrics.SetConnectionsCongestedPercent(ctx, info.DeviceName, info.CongestedPercent, now); err != nil { + found, err := eventsStore.Find(ctx, ev) + if err != nil { return nil, err } - if err := metrics.SetConnectionsMaxBackgroundPercent(ctx, info.DeviceName, info.MaxBackgroundPercent, now); err != nil { + if found == nil { + continue + } + if err := eventsStore.Insert(ctx, ev); err != nil { return nil, err } diff --git a/components/fuse/state/options.go b/components/fuse/state/options.go deleted file mode 100644 index 20ec04dd..00000000 --- a/components/fuse/state/options.go +++ /dev/null @@ -1,67 +0,0 @@ -package state - -import ( - "errors" - "time" -) - -var ErrInvalidLimit = errors.New("limit must be greater than or equal to 0") - -type Op struct { - sinceUnixSeconds int64 - beforeUnixSeconds int64 - sortUnixSecondsAscOrder bool - limit int -} - -type OpOption func(*Op) - -func (op *Op) applyOpts(opts []OpOption) error { - for _, opt := range opts { - opt(op) - } - - if op.limit < 0 { - return ErrInvalidLimit - } - - return nil -} - -// WithSince sets the since timestamp for the select queries. -// If not specified, it returns all events. -func WithSince(t time.Time) OpOption { - return func(op *Op) { - op.sinceUnixSeconds = t.Unix() - } -} - -// WithBefore sets the before timestamp for the delete queries. -// If not specified, it deletes all events. -func WithBefore(t time.Time) OpOption { - return func(op *Op) { - op.beforeUnixSeconds = t.Unix() - } -} - -// WithSortUnixSecondsAscendingOrder sorts the events by unix_seconds in ascending order, -// meaning its read query returns the oldest events first. -func WithSortUnixSecondsAscendingOrder() OpOption { - return func(op *Op) { - op.sortUnixSecondsAscOrder = true - } -} - -// WithSortUnixSecondsDescendingOrder sorts the events by unix_seconds in descending order, -// meaning its read query returns the newest events first. -func WithSortUnixSecondsDescendingOrder() OpOption { - return func(op *Op) { - op.sortUnixSecondsAscOrder = false - } -} - -func WithLimit(limit int) OpOption { - return func(op *Op) { - op.limit = limit - } -} diff --git a/components/fuse/state/state.go b/components/fuse/state/state.go deleted file mode 100644 index 603174ec..00000000 --- a/components/fuse/state/state.go +++ /dev/null @@ -1,243 +0,0 @@ -package state - -import ( - "context" - "database/sql" - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/leptonai/gpud/log" - "github.com/leptonai/gpud/pkg/sqlite" - - _ "github.com/mattn/go-sqlite3" -) - -const TableNameFUSEConnectionsEventHistory = "components_fuse_connections_event_history" - -const ( - // unix timestamp in seconds when the event was observed - ColumnUnixSeconds = "unix_seconds" - - ColumnDeviceName = "device_name" - ColumnCongestedPercentAgainstThreshold = "congested_percent_against_threshold" - ColumnMaxBackgroundPercentAgainstThreshold = "max_background_percent_against_threshold" -) - -// retain up to 3 days of events -const DefaultRetentionPeriod = 3 * 24 * time.Hour - -type Event struct { - UnixSeconds int64 `json:"unix_seconds"` - DeviceName string `json:"device_name"` - CongestedPercentAgainstThreshold float64 `json:"congested_percent_against_threshold"` - MaxBackgroundPercentAgainstThreshold float64 `json:"max_background_percent_against_threshold"` -} - -func (ev Event) JSON() ([]byte, error) { - return json.Marshal(ev) -} - -func CreateTableFUSEConnectionsEventHistory(ctx context.Context, db *sql.DB) error { - _, err := db.ExecContext(ctx, fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s ( - %s INTEGER NOT NULL, - %s TEXT NOT NULL, - %s REAL, - %s REAL -);`, TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - )) - return err -} - -func InsertEvent(ctx context.Context, db *sql.DB, event Event) error { - log.Logger.Debugw( - "inserting event", - "deviceName", event.DeviceName, - "congestedPercentAgainstThreshold", event.CongestedPercentAgainstThreshold, - "maxBackgroundPercentAgainstThreshold", event.MaxBackgroundPercentAgainstThreshold, - ) - - insertStatement := fmt.Sprintf(` -INSERT OR REPLACE INTO %s (%s, %s, %s, %s) VALUES (?, ?, ?, ?); -`, - TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - ) - - start := time.Now() - _, err := db.ExecContext( - ctx, - insertStatement, - event.UnixSeconds, - event.DeviceName, - event.CongestedPercentAgainstThreshold, - event.MaxBackgroundPercentAgainstThreshold, - ) - sqlite.RecordInsertUpdate(time.Since(start).Seconds()) - - return err -} - -func FindEvent(ctx context.Context, db *sql.DB, unixSeconds int64, devName string) (*Event, error) { - selectStatement := fmt.Sprintf(` -SELECT %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ?; -`, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ColumnDeviceName, - ) - - var foundEvent Event - if err := db.QueryRowContext( - ctx, - selectStatement, - unixSeconds, - devName, - ).Scan( - &foundEvent.UnixSeconds, - &foundEvent.DeviceName, - &foundEvent.CongestedPercentAgainstThreshold, - &foundEvent.MaxBackgroundPercentAgainstThreshold, - ); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, err - } - - return &foundEvent, nil -} - -// Returns nil if no event is found. -func ReadEvents(ctx context.Context, db *sql.DB, opts ...OpOption) ([]Event, error) { - selectStatement, args, err := createSelectStatementAndArgs(opts...) - if err != nil { - return nil, err - } - - rows, err := db.QueryContext(ctx, selectStatement, args...) - if err != nil { - return nil, err - } - defer rows.Close() - - events := []Event{} - for rows.Next() { - var event Event - if err := rows.Scan( - &event.UnixSeconds, - &event.DeviceName, - &event.CongestedPercentAgainstThreshold, - &event.MaxBackgroundPercentAgainstThreshold, - ); err != nil { - return nil, err - } - events = append(events, event) - } - if err := rows.Err(); err != nil { - return nil, err - } - if len(events) == 0 { - return nil, nil - } - - return events, nil -} - -func createSelectStatementAndArgs(opts ...OpOption) (string, []any, error) { - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return "", nil, err - } - - selectStatement := fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s`, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - TableNameFUSEConnectionsEventHistory, - ) - - args := []any{} - - if op.sinceUnixSeconds > 0 { - selectStatement += "\nWHERE " - selectStatement += fmt.Sprintf("%s >= ?", ColumnUnixSeconds) - args = append(args, op.sinceUnixSeconds) - } - - if op.sortUnixSecondsAscOrder { - selectStatement += "\nORDER BY " + ColumnUnixSeconds + " ASC" - } else { - selectStatement += "\nORDER BY " + ColumnUnixSeconds + " DESC" - } - - if op.limit > 0 { - selectStatement += fmt.Sprintf("\nLIMIT %d", op.limit) - } - - if len(args) == 0 { - return selectStatement, nil, nil - } - return selectStatement, args, nil -} - -func Purge(ctx context.Context, db *sql.DB, opts ...OpOption) (int, error) { - log.Logger.Debugw("purging fuse connections events") - deleteStatement, args, err := createDeleteStatementAndArgs(opts...) - if err != nil { - return 0, err - } - - start := time.Now() - rs, err := db.ExecContext(ctx, deleteStatement, args...) - if err != nil { - return 0, err - } - sqlite.RecordDelete(time.Since(start).Seconds()) - - affected, err := rs.RowsAffected() - if err != nil { - return 0, err - } - return int(affected), nil -} - -// ignores order by and limit -func createDeleteStatementAndArgs(opts ...OpOption) (string, []any, error) { - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return "", nil, err - } - - deleteStatement := fmt.Sprintf(`DELETE FROM %s`, - TableNameFUSEConnectionsEventHistory, - ) - - args := []any{} - - if op.beforeUnixSeconds > 0 { - deleteStatement += " WHERE " - deleteStatement += fmt.Sprintf("%s < ?", ColumnUnixSeconds) - args = append(args, op.beforeUnixSeconds) - } - - if len(args) == 0 { - return deleteStatement, nil, nil - } - return deleteStatement, args, nil -} diff --git a/components/fuse/state/state_test.go b/components/fuse/state/state_test.go deleted file mode 100644 index 4e5a3bbe..00000000 --- a/components/fuse/state/state_test.go +++ /dev/null @@ -1,372 +0,0 @@ -package state - -import ( - "context" - "database/sql" - "fmt" - "os" - "reflect" - "testing" - "time" - - "github.com/leptonai/gpud/pkg/sqlite" -) - -func TestCreateSelectStatement(t *testing.T) { - tests := []struct { - name string - opts []OpOption - want string - wantArgs []any - wantErr bool - }{ - { - name: "no options", - opts: nil, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s DESC`, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with since unix seconds", - opts: []OpOption{WithSince(time.Unix(1234, 0))}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -WHERE %s >= ? -ORDER BY %s DESC`, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ColumnUnixSeconds, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - { - name: "with ascending order", - opts: []OpOption{WithSortUnixSecondsAscendingOrder()}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s ASC`, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with limit", - opts: []OpOption{WithLimit(10)}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s DESC -LIMIT 10`, - ColumnUnixSeconds, - ColumnDeviceName, - ColumnCongestedPercentAgainstThreshold, - ColumnMaxBackgroundPercentAgainstThreshold, - TableNameFUSEConnectionsEventHistory, - ColumnUnixSeconds, - ), - wantArgs: nil, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, args, err := createSelectStatementAndArgs(tt.opts...) - if (err != nil) != tt.wantErr { - t.Errorf("createSelectStatement() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("createSelectStatement() = %v, want %v", got, tt.want) - } - if !reflect.DeepEqual(args, tt.wantArgs) { - t.Errorf("createSelectStatement() args = %v, want %v", args, tt.wantArgs) - } - }) - } -} - -func TestOpenMemory(t *testing.T) { - t.Parallel() - - db, err := sqlite.Open(":memory:") - if err != nil { - t.Fatalf("failed to open database: %v", err) - } - defer db.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := CreateTableFUSEConnectionsEventHistory(ctx, db); err != nil { - t.Fatal("failed to create table:", err) - } -} - -func TestInsertAndFindEvent(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - event := Event{ - UnixSeconds: time.Now().Unix(), - DeviceName: "test_device", - CongestedPercentAgainstThreshold: 75.5, - MaxBackgroundPercentAgainstThreshold: 80.0, - } - - // Test insertion - err := InsertEvent(ctx, db, event) - if err != nil { - t.Errorf("InsertEvent failed: %v", err) - } - - // Test finding the event - foundEvent, err := FindEvent(ctx, db, event.UnixSeconds, event.DeviceName) - if err != nil { - t.Errorf("FindEvent failed: %v", err) - } - if foundEvent == nil { - t.Error("expected to find event, but it wasn't found") - } - if foundEvent != nil && !reflect.DeepEqual(*foundEvent, event) { - t.Errorf("found event doesn't match inserted event. got %+v, want %+v", foundEvent, event) - } - - // Test finding non-existent event - notFoundEvent, err := FindEvent(ctx, db, event.UnixSeconds+1, event.DeviceName) - if err != nil { - t.Errorf("FindEvent failed: %v", err) - } - if notFoundEvent != nil { - t.Error("expected not to find event") - } -} - -func TestReadEvents_NoRows(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // test ReadEvents with empty table - events, err := ReadEvents(ctx, db) - if err != nil { - t.Errorf("expected no error for empty table, got: %v", err) - } - if events != nil { - t.Errorf("expected nil events for empty table, got: %v", events) - } -} - -func TestReadEvents(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - baseTime := time.Now().Unix() - - // Insert test events - testEvents := []Event{ - { - UnixSeconds: baseTime, - DeviceName: "device1", - CongestedPercentAgainstThreshold: 75.5, - MaxBackgroundPercentAgainstThreshold: 80.0, - }, - { - UnixSeconds: baseTime + 1, - DeviceName: "device2", - CongestedPercentAgainstThreshold: 85.5, - MaxBackgroundPercentAgainstThreshold: 90.0, - }, - { - UnixSeconds: baseTime + 2, - DeviceName: "device3", - CongestedPercentAgainstThreshold: 95.5, - MaxBackgroundPercentAgainstThreshold: 100.0, - }, - } - - for _, event := range testEvents { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert test event: %v", err) - } - } - - // test reading all events - events, err := ReadEvents(ctx, db) - if err != nil { - t.Errorf("ReadEvents failed: %v", err) - } - if len(events) != len(testEvents) { - t.Errorf("expected %d events, got %d", len(testEvents), len(events)) - } - - // test reading events with limit - events, err = ReadEvents(ctx, db, WithLimit(2)) - if err != nil { - t.Errorf("ReadEvents with limit failed: %v", err) - } - if len(events) != 2 { - t.Errorf("expected 2 events with limit, got %d", len(events)) - } - - // test reading events since specific time - events, err = ReadEvents(ctx, db, WithSince(time.Unix(baseTime+1, 0))) - if err != nil { - t.Errorf("ReadEvents with since time failed: %v", err) - } - if len(events) != 2 { - t.Errorf("expected 2 events since baseTime+1, got %d", len(events)) - } - - // Test reading events with ascending order - events, err = ReadEvents(ctx, db, WithSortUnixSecondsAscendingOrder()) - if err != nil { - t.Errorf("ReadEvents with ascending order failed: %v", err) - } - if len(events) != 3 || events[0].UnixSeconds > events[len(events)-1].UnixSeconds { - t.Error("Events not properly ordered in ascending order") - } -} - -func TestPurge(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - setup []Event - opts []OpOption - wantErr bool - wantPurged int - wantCount int - }{ - { - name: "delete events before timestamp", - setup: []Event{ - {UnixSeconds: 1000, DeviceName: "device1"}, - {UnixSeconds: 2000, DeviceName: "device2"}, - {UnixSeconds: 3000, DeviceName: "device3"}, - }, - opts: []OpOption{WithBefore(time.Unix(2500, 0))}, - wantPurged: 2, - wantCount: 1, - }, - { - name: "delete all events", - setup: []Event{ - {UnixSeconds: 1000, DeviceName: "device1"}, - {UnixSeconds: 2000, DeviceName: "device2"}, - }, - opts: []OpOption{}, - wantPurged: 2, - wantCount: 0, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Insert test data - for _, event := range tt.setup { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert test event: %v", err) - } - } - - // perform deletion - purged, err := Purge(ctx, db, tt.opts...) - if (err != nil) != tt.wantErr { - t.Errorf("Purge() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if purged != tt.wantPurged { - t.Errorf("Purge() purged = %v, want %v", purged, tt.wantPurged) - } - - // verify results - events, err := ReadEvents(ctx, db) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - if events == nil { - if tt.wantCount != 0 { - t.Errorf("expected %d events, got nil", tt.wantCount) - } - } else if len(events) != tt.wantCount { - t.Errorf("expected %d events, got %d", tt.wantCount, len(events)) - } - }) - } -} - -func setupTestDB(t *testing.T) (*sql.DB, func()) { - tmpfile, err := os.CreateTemp("", "test-fuse-*.db") - if err != nil { - t.Fatalf("failed to create temp file: %v", err) - } - tmpfile.Close() - - db, err := sqlite.Open(tmpfile.Name()) - if err != nil { - os.Remove(tmpfile.Name()) - t.Fatalf("failed to open database: %v", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := CreateTableFUSEConnectionsEventHistory(ctx, db); err != nil { - db.Close() - os.Remove(tmpfile.Name()) - t.Fatalf("failed to create table: %v", err) - } - - cleanup := func() { - db.Close() - os.Remove(tmpfile.Name()) - } - return db, cleanup -} diff --git a/internal/server/server.go b/internal/server/server.go index 276c265d..b7c833c0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -17,6 +17,7 @@ import ( "net/http/pprof" goOS "os" "path" + "runtime" "strings" "syscall" "time" @@ -50,7 +51,6 @@ import ( nvidia_gsp_firmware_mode_id "github.com/leptonai/gpud/components/accelerator/nvidia/gsp-firmware-mode/id" nvidia_hw_slowdown "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown" nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" nvidia_infiniband "github.com/leptonai/gpud/components/accelerator/nvidia/infiniband" nvidia_infiniband_id "github.com/leptonai/gpud/components/accelerator/nvidia/infiniband/id" nvidia_info "github.com/leptonai/gpud/components/accelerator/nvidia/info" @@ -77,6 +77,7 @@ import ( containerd_pod_id "github.com/leptonai/gpud/components/containerd/pod/id" "github.com/leptonai/gpud/components/cpu" cpu_id "github.com/leptonai/gpud/components/cpu/id" + events_db "github.com/leptonai/gpud/components/db" "github.com/leptonai/gpud/components/disk" disk_id "github.com/leptonai/gpud/components/disk/id" "github.com/leptonai/gpud/components/dmesg" @@ -88,7 +89,6 @@ import ( file_id "github.com/leptonai/gpud/components/file/id" "github.com/leptonai/gpud/components/fuse" fuse_id "github.com/leptonai/gpud/components/fuse/id" - fuse_state "github.com/leptonai/gpud/components/fuse/state" "github.com/leptonai/gpud/components/info" info_id "github.com/leptonai/gpud/components/info/id" k8s_pod "github.com/leptonai/gpud/components/k8s/pod" @@ -232,29 +232,6 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID } }() - if err := fuse_state.CreateTableFUSEConnectionsEventHistory(ctx, dbRW); err != nil { - return nil, fmt.Errorf("failed to create fuse connections state table: %w", err) - } - go func() { - dur := fuse_state.DefaultRetentionPeriod - for { - select { - case <-ctx.Done(): - return - case <-time.After(dur): - now := time.Now().UTC() - before := now.Add(-dur) - - purged, err := fuse_state.Purge(ctx, dbRW, fuse_state.WithBefore(before)) - if err != nil { - log.Logger.Warnw("failed to delete FUSE connections events", "error", err) - } else { - log.Logger.Debugw("deleted FUSE connections events", "before", before, "purged", purged) - } - } - } - }() - // create nvidia-specific table regardless of whether nvidia components are enabled if err := nvidia_xid_sxid_state.CreateTableXidSXidEventHistory(ctx, dbRW); err != nil { return nil, fmt.Errorf("failed to create nvidia xid/sxid state table: %w", err) @@ -279,28 +256,43 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID } }() - if err := nvidia_hw_slowdown_state.CreateTable(ctx, dbRW); err != nil { - return nil, fmt.Errorf("failed to create nvidia clock events table: %w", err) + nvidiaInstalled, err := nvidia_query.GPUsInstalled(ctx) + if err != nil { + return nil, err } - go func() { - dur := nvidia_hw_slowdown_state.DefaultRetentionPeriod - for { - select { - case <-ctx.Done(): - return - case <-time.After(dur): - now := time.Now().UTC() - before := now.Add(-dur) - purged, err := nvidia_hw_slowdown_state.Purge(ctx, dbRW, nvidia_hw_slowdown_state.WithBefore(before)) - if err != nil { - log.Logger.Warnw("failed to delete nvidia clock events", "error", err) - } else { - log.Logger.Debugw("deleted nvidia clock events", "before", before, "purged", purged) - } - } + var eventsStoreNvidiaErrorXid events_db.Store + var eventsStoreNvidiaHWSlowdown events_db.Store + if runtime.GOOS == "linux" && nvidiaInstalled { + eventsStoreNvidiaErrorXid, err = events_db.NewStore( + dbRW, + dbRO, + events_db.CreateDefaultTableName(nvidia_component_error_xid_id.Name), + 3*24*time.Hour, + ) + if err != nil { + return nil, err } - }() + + eventsStoreNvidiaHWSlowdown, err = events_db.NewStore( + dbRW, + dbRO, + events_db.CreateDefaultTableName(nvidia_hw_slowdown_id.Name), + 3*24*time.Hour, + ) + if err != nil { + return nil, err + } + + nvidia_query.SetDefaultPoller( + nvidia_query.WithXidEventsStore(eventsStoreNvidiaErrorXid), + nvidia_query.WithHWSlowdownEventsStore(eventsStoreNvidiaHWSlowdown), + nvidia_query.WithNvidiaSMICommand(options.NvidiaSMICommand), + nvidia_query.WithNvidiaSMIQueryCommand(options.NvidiaSMIQueryCommand), + nvidia_query.WithIbstatCommand(options.IbstatCommand), + nvidia_query.WithInfinibandClassDirectory(options.InfinibandClassDirectory), + ) + } if err := pci_state.CreateTable(ctx, dbRW); err != nil { return nil, fmt.Errorf("failed to create pci state table: %w", err) @@ -510,7 +502,11 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("failed to validate component %s config: %w", k, err) } - allComponents = append(allComponents, fuse.New(ctx, cfg)) + c, err := fuse.New(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create component %s: %w", k, err) + } + allComponents = append(allComponents, c) case pci_id.Name: cfg := pci.Config{Query: defaultQueryCfg} @@ -812,7 +808,7 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("failed to validate component %s config: %w", k, err) } - allComponents = append(allComponents, nvidia_hw_slowdown.New(ctx, cfg)) + allComponents = append(allComponents, nvidia_hw_slowdown.New(ctx, cfg, eventsStoreNvidiaHWSlowdown)) case nvidia_clock_speed_id.Name: cfg := nvidia_common.Config{Query: defaultQueryCfg, ToolOverwrites: options.ToolOverwrites} diff --git a/pkg/fuse/fuse.go b/pkg/fuse/fuse.go index 448ed51a..b68113cf 100644 --- a/pkg/fuse/fuse.go +++ b/pkg/fuse/fuse.go @@ -2,6 +2,7 @@ package fuse import ( + "encoding/json" "fmt" "io" "os" @@ -51,6 +52,10 @@ type ConnectionInfo struct { Waiting int `json:"waiting"` } +func (info ConnectionInfo) JSON() ([]byte, error) { + return json.Marshal(info) +} + type ConnectionInfos []ConnectionInfo func (infos ConnectionInfos) RenderTable(wr io.Writer) {