diff --git a/components/accelerator/nvidia/hw-slowdown/component.go b/components/accelerator/nvidia/hw-slowdown/component.go index 016d30cd..bea968b6 100644 --- a/components/accelerator/nvidia/hw-slowdown/component.go +++ b/components/accelerator/nvidia/hw-slowdown/component.go @@ -5,22 +5,20 @@ import ( "context" "database/sql" "fmt" - "strings" "time" + "github.com/dustin/go-humanize" + "github.com/prometheus/client_golang/prometheus" + "github.com/leptonai/gpud/components" nvidia_common "github.com/leptonai/gpud/components/accelerator/nvidia/common" nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query_metrics_clock "github.com/leptonai/gpud/components/accelerator/nvidia/query/metrics/clock" "github.com/leptonai/gpud/components/common" + events_db "github.com/leptonai/gpud/components/db" "github.com/leptonai/gpud/components/query" "github.com/leptonai/gpud/log" - - "github.com/dustin/go-humanize" - "github.com/prometheus/client_golang/prometheus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -33,7 +31,7 @@ const ( DefaultStateHWSlowdownEventsThresholdFrequencyPerMinute = 0.6 ) -func New(ctx context.Context, cfg nvidia_common.Config) (components.Component, error) { +func New(ctx context.Context, cfg nvidia_common.Config, eventsStore events_db.Store) (components.Component, error) { if nvidia_query.GetDefaultPoller() == nil { return nil, nvidia_query.ErrDefaultPollerNotSet } @@ -51,21 +49,7 @@ func New(ctx context.Context, cfg nvidia_common.Config) (components.Component, e cancel: ccancel, poller: nvidia_query.GetDefaultPoller(), - readEvents: func(ctx context.Context, since time.Time) ([]nvidia_hw_slowdown_state.Event, error) { - // the default nvidia poller persists the events to the storage - // so we can just read from the storage - return nvidia_hw_slowdown_state.ReadEvents( - ctx, - cfg.Query.State.DBRO, - - nvidia_hw_slowdown_state.WithSince(since), - - // in order to dedup nvidia-smi events and prioritize nvml events - // otherwise, we have deduplicate objects from nvml and nvidia-smi - // deprecate this once we removed nvidia-smi dependency - nvidia_hw_slowdown_state.WithDedupDataSource(true), - ) - }, + eventsStore: eventsStore, }, nil } @@ -80,7 +64,7 @@ type component struct { poller query.Poller gatherer prometheus.Gatherer - readEvents func(ctx context.Context, since time.Time) ([]nvidia_hw_slowdown_state.Event, error) + eventsStore events_db.Store } func (c *component) Name() string { return nvidia_hw_slowdown_id.Name } @@ -104,7 +88,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { since := time.Now().UTC().Add(-c.stateHWSlowdownEvaluationWindow) - events, err := c.readEvents(ctx, since) + events, err := c.eventsStore.Get(ctx, since) if err != nil { return nil, err } @@ -121,7 +105,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { eventsByMinute := make(map[int]struct{}) for _, event := range events { - min := int(event.Timestamp / 60) // unix seconds to minutes + min := int(event.Time.Unix() / 60) // unix seconds to minutes eventsByMinute[min] = struct{}{} } @@ -157,36 +141,8 @@ func (c *component) States(ctx context.Context) ([]components.State, error) { }, nil } -const ( - EventNameHWSlowdown = "hw_slowdown" - EventKeyGPUUUID = "gpu_uuid" -) - func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) { - events, err := c.readEvents(ctx, since) - if err != nil { - return nil, err - } - - if len(events) == 0 { - log.Logger.Debugw("no event found for /events", "component", c.Name(), "since", humanize.Time(since)) - return nil, nil - } - - log.Logger.Debugw("found events", "component", c.Name(), "since", humanize.Time(since), "count", len(events)) - convertedEvents := make([]components.Event, 0, len(events)) - for _, event := range events { - convertedEvents = append(convertedEvents, components.Event{ - Time: metav1.Time{Time: time.Unix(event.Timestamp, 0).UTC()}, - Name: EventNameHWSlowdown, - Type: common.EventTypeWarning, - Message: strings.Join(event.Reasons, ", "), - ExtraInfo: map[string]string{ - EventKeyGPUUUID: event.GPUUUID, - }, - }) - } - return convertedEvents, nil + return c.eventsStore.Get(ctx, since) } func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) { diff --git a/components/accelerator/nvidia/hw-slowdown/component_test.go b/components/accelerator/nvidia/hw-slowdown/component_test.go index 9bd0a24d..f6a2675b 100644 --- a/components/accelerator/nvidia/hw-slowdown/component_test.go +++ b/components/accelerator/nvidia/hw-slowdown/component_test.go @@ -5,12 +5,18 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/leptonai/gpud/components" nvidia_common "github.com/leptonai/gpud/components/accelerator/nvidia/common" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" + nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" + "github.com/leptonai/gpud/components/common" + "github.com/leptonai/gpud/components/db" + events_db "github.com/leptonai/gpud/components/db" "github.com/leptonai/gpud/pkg/sqlite" - - "github.com/stretchr/testify/assert" ) func TestComponentWithNoPoller(t *testing.T) { @@ -18,7 +24,7 @@ func TestComponentWithNoPoller(t *testing.T) { defer cancel() defaultPoller := nvidia_query.GetDefaultPoller() - _, err := New(ctx, nvidia_common.Config{}) + _, err := New(ctx, nvidia_common.Config{}, nil) if defaultPoller != nil { // expects no error @@ -38,7 +44,7 @@ func TestComponentStates(t *testing.T) { name string window time.Duration thresholdPerMinute float64 - insertedEvent []nvidia_hw_slowdown_state.Event + insertedEvent []components.Event expectedStates int expectHealthy bool }{ @@ -46,18 +52,52 @@ func TestComponentStates(t *testing.T) { name: "single event within window", window: 10 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{{Timestamp: now.Add(-5 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}}, - expectedStates: 1, - expectHealthy: true, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-5 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + }, + expectedStates: 1, + expectHealthy: true, }, { name: "multiple events within window but below threshold", window: 10 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-5 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-1"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-2"}}, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-5 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, }, expectedStates: 1, expectHealthy: true, @@ -66,11 +106,43 @@ func TestComponentStates(t *testing.T) { name: "events above threshold", window: 5 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-1"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-2"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-3"}}, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, }, expectedStates: 1, expectHealthy: false, @@ -79,26 +151,155 @@ func TestComponentStates(t *testing.T) { name: "events above threshold with multiple GPUs", window: 5 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-4 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, - - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-3 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, - - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-2 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, - - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-1", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-2", Reasons: []string{"reason"}}, - {Timestamp: now.Add(-1 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-3", Reasons: []string{"reason"}}, + insertedEvent: []components.Event{ + // GPU 0-3 events at -4 minutes + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-4 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-3", + }, + }, + // GPU 0-3 events at -3 minutes + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-3", + }, + }, + // GPU 0-3 events at -2 minutes + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-2 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-3", + }, + }, + // GPU 0-3 events at -1 minutes + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-1", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-2", + }, + }, + { + Time: metav1.Time{Time: now.Add(-1 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-3", + }, + }, }, expectedStates: 1, expectHealthy: false, @@ -107,9 +308,25 @@ func TestComponentStates(t *testing.T) { name: "events outside window", window: 5 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{ - {Timestamp: now.Add(-10 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-0"}}, - {Timestamp: now.Add(-8 * time.Minute).Unix(), DataSource: "nvml", GPUUUID: "gpu-0", Reasons: []string{"reason-1"}}, + insertedEvent: []components.Event{ + { + Time: metav1.Time{Time: now.Add(-10 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: now.Add(-8 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, }, expectedStates: 1, expectHealthy: true, @@ -118,7 +335,7 @@ func TestComponentStates(t *testing.T) { name: "no events", window: 10 * time.Minute, thresholdPerMinute: 0.6, - insertedEvent: []nvidia_hw_slowdown_state.Event{}, + insertedEvent: []components.Event{}, expectedStates: 1, expectHealthy: true, }, @@ -133,43 +350,246 @@ func TestComponentStates(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - if err := nvidia_hw_slowdown_state.CreateTable(ctx, dbRW); err != nil { - t.Fatalf("failed to create table: %v", err) - } + store, err := db.NewStore(dbRW, dbRO, "test_events", 0) + assert.NoError(t, err) + defer store.Close() if len(tc.insertedEvent) > 0 { for _, event := range tc.insertedEvent { - if err := nvidia_hw_slowdown_state.InsertEvent(ctx, dbRW, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } + err := store.Insert(ctx, event) + assert.NoError(t, err) } } c := &component{ stateHWSlowdownEvaluationWindow: tc.window, stateHWSlowdownEventsThresholdFrequencyPerMinute: tc.thresholdPerMinute, - readEvents: func(ctx context.Context, since time.Time) ([]nvidia_hw_slowdown_state.Event, error) { - return nvidia_hw_slowdown_state.ReadEvents( - ctx, - dbRO, - nvidia_hw_slowdown_state.WithSince(now.Add(-tc.window)), - nvidia_hw_slowdown_state.WithDedupDataSource(true), - ) - }, + eventsStore: store, } states, err := c.States(ctx) - if err != nil { - t.Fatalf("failed to get states: %v", err) + assert.NoError(t, err) + assert.Equal(t, tc.expectedStates, len(states)) + + if len(states) > 0 { + assert.Equal(t, tc.expectHealthy, states[0].Healthy) } + }) + } +} + +func TestComponentRegisterCollectors(t *testing.T) { + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + reg := prometheus.NewRegistry() + c := &component{} - if len(states) != tc.expectedStates { - t.Fatalf("expected %d states, got %d", tc.expectedStates, len(states)) + err := c.RegisterCollectors(reg, dbRW, dbRO, "test_metrics") + assert.NoError(t, err) + assert.Equal(t, reg, c.gatherer) +} + +func TestComponentStatesEdgeCases(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + window time.Duration + thresholdPerMinute float64 + setupStore func(store events_db.Store, ctx context.Context) error + expectError bool + expectedStates int + expectHealthy bool + }{ + { + name: "zero evaluation window", + window: 0, + thresholdPerMinute: 0.6, + setupStore: func(store events_db.Store, ctx context.Context) error { return nil }, + expectError: false, + expectedStates: 1, + expectHealthy: true, + }, + { + name: "negative evaluation window", + window: -10 * time.Minute, + thresholdPerMinute: 0.6, + setupStore: func(store events_db.Store, ctx context.Context) error { return nil }, + expectError: false, + expectedStates: 1, + expectHealthy: true, + }, + { + name: "zero threshold", + window: 10 * time.Minute, + thresholdPerMinute: 0, + setupStore: func(store events_db.Store, ctx context.Context) error { + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC().Add(-5 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + } + return store.Insert(ctx, event) + }, + expectError: false, + expectedStates: 1, + expectHealthy: false, + }, + { + name: "negative threshold", + window: 10 * time.Minute, + thresholdPerMinute: -0.6, + setupStore: func(store events_db.Store, ctx context.Context) error { + event := components.Event{ + Time: metav1.Time{Time: time.Now().UTC().Add(-5 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + } + return store.Insert(ctx, event) + }, + expectError: false, + expectedStates: 1, + expectHealthy: false, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := db.NewStore(dbRW, dbRO, "test_events", 0) + assert.NoError(t, err) + defer store.Close() + + err = tc.setupStore(store, ctx) + assert.NoError(t, err) + + c := &component{ + stateHWSlowdownEvaluationWindow: tc.window, + stateHWSlowdownEventsThresholdFrequencyPerMinute: tc.thresholdPerMinute, + eventsStore: store, } - if len(states) > 0 && states[0].Healthy != tc.expectHealthy { - t.Errorf("expected healthy=%v, got %v", tc.expectHealthy, states[0].Healthy) + states, err := c.States(ctx) + if tc.expectError { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tc.expectedStates, len(states)) + if len(states) > 0 { + assert.Equal(t, tc.expectHealthy, states[0].Healthy) } }) } } + +func TestComponentName(t *testing.T) { + t.Parallel() + c := &component{} + assert.Equal(t, nvidia_hw_slowdown_id.Name, c.Name()) +} + +func TestComponentStart(t *testing.T) { + t.Parallel() + c := &component{} + assert.NoError(t, c.Start()) +} + +func TestComponentEvents(t *testing.T) { + t.Parallel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + store, err := db.NewStore(dbRW, dbRO, "test_events", 0) + assert.NoError(t, err) + defer store.Close() + + // Insert test events + testEvents := []components.Event{ + { + Time: metav1.Time{Time: time.Now().UTC().Add(-5 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-0", + }, + }, + { + Time: metav1.Time{Time: time.Now().UTC().Add(-3 * time.Minute)}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown detected", + ExtraInfo: map[string]string{ + "gpu_uuid": "gpu-1", + }, + }, + } + + for _, event := range testEvents { + err := store.Insert(ctx, event) + assert.NoError(t, err) + } + + c := &component{ + eventsStore: store, + } + + // Test getting events since a specific time + since := time.Now().UTC().Add(-10 * time.Minute) + events, err := c.Events(ctx, since) + assert.NoError(t, err) + assert.Equal(t, len(testEvents), len(events)) + + // Test getting events with more recent time + since = time.Now().UTC().Add(-4 * time.Minute) + events, err = c.Events(ctx, since) + assert.NoError(t, err) + assert.Equal(t, 1, len(events)) +} + +func TestComponentMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + dbRW, dbRO, cleanup := sqlite.OpenTestDB(t) + defer cleanup() + + reg := prometheus.NewRegistry() + c := &component{} + + err := c.RegisterCollectors(reg, dbRW, dbRO, "test_metrics") + assert.NoError(t, err) + assert.Equal(t, reg, c.gatherer) + + since := time.Now().UTC().Add(-10 * time.Minute) + metrics, err := c.Metrics(ctx, since) + + // Since we don't have a mock for nvidia_query_metrics_clock functions, + // we expect an error or empty metrics + if err != nil { + assert.Error(t, err) + } else { + assert.Empty(t, metrics) + } +} diff --git a/components/accelerator/nvidia/hw-slowdown/state/options.go b/components/accelerator/nvidia/hw-slowdown/state/options.go deleted file mode 100644 index 6802266e..00000000 --- a/components/accelerator/nvidia/hw-slowdown/state/options.go +++ /dev/null @@ -1,77 +0,0 @@ -package state - -import ( - "errors" - "time" -) - -var ErrInvalidLimit = errors.New("limit must be greater than or equal to 0") - -type Op struct { - sinceUnixSeconds int64 - beforeUnixSeconds int64 - sortUnixSecondsAscOrder bool - limit int - dedupDataSource bool -} - -type OpOption func(*Op) - -func (op *Op) applyOpts(opts []OpOption) error { - for _, opt := range opts { - opt(op) - } - - if op.limit < 0 { - return ErrInvalidLimit - } - - return nil -} - -// WithSince sets the since timestamp for the select queries. -// If not specified, it returns all events. -func WithSince(t time.Time) OpOption { - return func(op *Op) { - op.sinceUnixSeconds = t.Unix() - } -} - -// WithBefore sets the before timestamp for the delete queries. -// If not specified, it deletes all events. -func WithBefore(t time.Time) OpOption { - return func(op *Op) { - op.beforeUnixSeconds = t.Unix() - } -} - -// WithSortUnixSecondsAscendingOrder sorts the events by unix_seconds in ascending order, -// meaning its read query returns the oldest events first. -func WithSortUnixSecondsAscendingOrder() OpOption { - return func(op *Op) { - op.sortUnixSecondsAscOrder = true - } -} - -// WithSortUnixSecondsDescendingOrder sorts the events by unix_seconds in descending order, -// meaning its read query returns the newest events first. -func WithSortUnixSecondsDescendingOrder() OpOption { - return func(op *Op) { - op.sortUnixSecondsAscOrder = false - } -} - -func WithLimit(limit int) OpOption { - return func(op *Op) { - op.limit = limit - } -} - -// Set true to deduplicate events by data sources. -// Meaning, out of nvml and nvidia-smi, if there are events at the same time, -// only the one from nvml will be kept. -func WithDedupDataSource(dedup bool) OpOption { - return func(op *Op) { - op.dedupDataSource = dedup - } -} diff --git a/components/accelerator/nvidia/hw-slowdown/state/state.go b/components/accelerator/nvidia/hw-slowdown/state/state.go deleted file mode 100644 index 327fc473..00000000 --- a/components/accelerator/nvidia/hw-slowdown/state/state.go +++ /dev/null @@ -1,324 +0,0 @@ -// Package state provides the persistent storage layer for the hw slowdown events. -package state - -import ( - "context" - "database/sql" - "encoding/json" - "errors" - "fmt" - "reflect" - "sort" - "time" - - "github.com/leptonai/gpud/log" - "github.com/leptonai/gpud/pkg/sqlite" - - _ "github.com/mattn/go-sqlite3" -) - -const ( - DeprecatedTableNameClockEvents = "components_accelerator_nvidia_query_clock_events" - TableNameHWSlowdown = "components_accelerator_nvidia_hw_slowdown_events" -) - -const ( - // unix timestamp in seconds when the event was observed - ColumnUnixSeconds = "unix_seconds" - - // either "nvml" or "dmesg" - ColumnDataSource = "data_source" - - // gpu uuid - ColumnGPUUUID = "gpu_uuid" - - // reasons for clock events - ColumnReasons = "reasons" -) - -// retain up to 3 days of events -const DefaultRetentionPeriod = 3 * 24 * time.Hour - -type Event struct { - Timestamp int64 - DataSource string - GPUUUID string - Reasons []string -} - -func CreateTable(ctx context.Context, db *sql.DB) error { - _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", DeprecatedTableNameClockEvents)) - if err != nil { - return err - } - - _, err = db.ExecContext(ctx, fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s ( - %s INTEGER NOT NULL, - %s TEXT NOT NULL, - %s TEXT NOT NULL, - %s TEXT NOT NULL -);`, TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - )) - return err -} - -func InsertEvent(ctx context.Context, db *sql.DB, event Event) error { - log.Logger.Debugw("inserting event", "dataSource", event.DataSource, "uuid", event.GPUUUID, "reasons", event.Reasons) - - insertStatement := fmt.Sprintf(` -INSERT OR REPLACE INTO %s (%s, %s, %s, %s) VALUES (?, ?, ?, NULLIF(?, '')); -`, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - ) - - reasonsBytes, err := json.Marshal(event.Reasons) - if err != nil { - return err - } - - start := time.Now() - _, err = db.ExecContext( - ctx, - insertStatement, - event.Timestamp, - event.DataSource, - event.GPUUUID, - string(reasonsBytes), - ) - sqlite.RecordInsertUpdate(time.Since(start).Seconds()) - - return err -} - -func FindEvent(ctx context.Context, db *sql.DB, event Event) (bool, error) { - selectStatement := fmt.Sprintf(` -SELECT %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ? AND %s = ?; -`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ) - - start := time.Now() - var foundEvent Event - var reasonsRaw string - err := db.QueryRowContext( - ctx, - selectStatement, - event.Timestamp, - event.DataSource, - event.GPUUUID, - ).Scan( - &foundEvent.Timestamp, - &foundEvent.DataSource, - &foundEvent.GPUUUID, - &reasonsRaw, - ) - sqlite.RecordSelect(time.Since(start).Seconds()) - - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return false, nil - } - return false, err - } - if err := json.Unmarshal([]byte(reasonsRaw), &foundEvent.Reasons); err != nil { - return false, err - } - - sort.Strings(foundEvent.Reasons) - sort.Strings(event.Reasons) - - // event at the same time but with different details - if len(foundEvent.Reasons) > 0 && !reflect.DeepEqual(foundEvent.Reasons, event.Reasons) { - return false, nil - } - - // found event - // e.g., same messages in dmesg - return true, nil -} - -// Returns nil if no event is found. -func ReadEvents(ctx context.Context, db *sql.DB, opts ...OpOption) ([]Event, error) { - selectStatement, args, err := createSelectStatementAndArgs(opts...) - if err != nil { - return nil, err - } - - rows, err := db.QueryContext(ctx, selectStatement, args...) - if err != nil { - return nil, err - } - defer rows.Close() - - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return nil, err - } - - // data sources can be multiple (e.g., nvml and nvidia-smi) - // here, we prioritize nvml over nvidia-smi - // for the same unix second, only the one from nvml will be kept - unixToUUIDToDataSource := make(map[int64]map[string]string, 0) - - events := []Event{} - for rows.Next() { - var event Event - var reasonsRaw string - if err := rows.Scan( - &event.Timestamp, - &event.DataSource, - &event.GPUUUID, - &reasonsRaw, - ); err != nil { - return nil, err - } - - _, ok := unixToUUIDToDataSource[event.Timestamp] - if !ok { - unixToUUIDToDataSource[event.Timestamp] = make(map[string]string, 0) - } - - prevDataSource, ok := unixToUUIDToDataSource[event.Timestamp][event.GPUUUID] - currDataSource := event.DataSource - - toInclude := true - if !ok { - // this GPU had NO PREVIOUS event at this unix second ("nvml" or "nvidia-smi") - unixToUUIDToDataSource[event.Timestamp][event.GPUUUID] = currDataSource - } else { - // this GPU HAD A PREVIOUS event at this unix second BUT with different data source - - // this works because we prioritize "nvml" over "nvidia-smi" in the SQL select statement - // "ORDER BY data_source DESC" where "nvml" comes before "nvidia-smi" - if op.dedupDataSource && prevDataSource == "nvml" { - // already had an event with the prioritized "nvml" data source - // thus we DO NOT need to return another event with the other data source ("nvidia-smi") - toInclude = false - } - } - - if !toInclude { - continue - } - - if err := json.Unmarshal([]byte(reasonsRaw), &event.Reasons); err != nil { - return nil, err - } - events = append(events, event) - } - if err := rows.Err(); err != nil { - return nil, err - } - if len(events) == 0 { - return nil, nil - } - - return events, nil -} - -func createSelectStatementAndArgs(opts ...OpOption) (string, []any, error) { - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return "", nil, err - } - - selectStatement := fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ) - - args := []any{} - - if op.sinceUnixSeconds > 0 { - selectStatement += "\nWHERE " - selectStatement += fmt.Sprintf("%s >= ?", ColumnUnixSeconds) - args = append(args, op.sinceUnixSeconds) - } - - // sort by unix seconds and data source - // data source is sorted in reverse order so that "nvml" returns before "nvidia-smi" - // for the same unix second and event type - selectStatement += "\nORDER BY " + ColumnUnixSeconds - if op.sortUnixSecondsAscOrder { - selectStatement += " ASC" - } else { - selectStatement += " DESC" - } - selectStatement += ", " + ColumnDataSource + " DESC" - - if op.limit > 0 { - selectStatement += fmt.Sprintf("\nLIMIT %d", op.limit) - } - - if len(args) == 0 { - return selectStatement, nil, nil - } - return selectStatement, args, nil -} - -func Purge(ctx context.Context, db *sql.DB, opts ...OpOption) (int, error) { - log.Logger.Debugw("purging nvidia clock events") - deleteStatement, args, err := createDeleteStatementAndArgs(opts...) - if err != nil { - return 0, err - } - - start := time.Now() - rs, err := db.ExecContext(ctx, deleteStatement, args...) - sqlite.RecordDelete(time.Since(start).Seconds()) - - if err != nil { - return 0, err - } - - affected, err := rs.RowsAffected() - if err != nil { - return 0, err - } - return int(affected), nil -} - -// ignores order by and limit -func createDeleteStatementAndArgs(opts ...OpOption) (string, []any, error) { - op := &Op{} - if err := op.applyOpts(opts); err != nil { - return "", nil, err - } - - deleteStatement := fmt.Sprintf(`DELETE FROM %s`, - TableNameHWSlowdown, - ) - - args := []any{} - - if op.beforeUnixSeconds > 0 { - deleteStatement += " WHERE " - deleteStatement += fmt.Sprintf("%s < ?", ColumnUnixSeconds) - args = append(args, op.beforeUnixSeconds) - } - - if len(args) == 0 { - return deleteStatement, nil, nil - } - return deleteStatement, args, nil -} diff --git a/components/accelerator/nvidia/hw-slowdown/state/state_test.go b/components/accelerator/nvidia/hw-slowdown/state/state_test.go deleted file mode 100644 index f1ab524f..00000000 --- a/components/accelerator/nvidia/hw-slowdown/state/state_test.go +++ /dev/null @@ -1,767 +0,0 @@ -package state - -import ( - "context" - "database/sql" - "fmt" - "os" - "reflect" - "testing" - "time" - - "github.com/leptonai/gpud/pkg/sqlite" -) - -func TestCreateSelectStatement(t *testing.T) { - tests := []struct { - name string - opts []OpOption - want string - wantArgs []any - wantErr bool - }{ - { - name: "no options", - opts: nil, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s DESC, %s DESC`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with since unix seconds", - opts: []OpOption{WithSince(time.Unix(1234, 0))}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -WHERE %s >= ? -ORDER BY %s DESC, %s DESC`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - { - name: "with ascending order", - opts: []OpOption{WithSortUnixSecondsAscendingOrder()}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s ASC, %s DESC`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with limit", - opts: []OpOption{WithLimit(10)}, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -ORDER BY %s DESC, %s DESC -LIMIT 10`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with all options", - opts: []OpOption{ - WithSince(time.Unix(1234, 0)), - WithSortUnixSecondsAscendingOrder(), - WithLimit(10), - }, - want: fmt.Sprintf(`SELECT %s, %s, %s, %s -FROM %s -WHERE %s >= ? -ORDER BY %s ASC, %s DESC -LIMIT 10`, - ColumnUnixSeconds, - ColumnDataSource, - ColumnGPUUUID, - ColumnReasons, - TableNameHWSlowdown, - ColumnUnixSeconds, - ColumnUnixSeconds, - ColumnDataSource, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, args, err := createSelectStatementAndArgs(tt.opts...) - if (err != nil) != tt.wantErr { - t.Errorf("createSelectStatement() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("createSelectStatement() = %v, want %v", got, tt.want) - } - if !reflect.DeepEqual(args, tt.wantArgs) { - t.Errorf("createSelectStatement() args = %v, want %v", args, tt.wantArgs) - } - }) - } -} - -func TestOpenMemory(t *testing.T) { - t.Parallel() - - db, err := sqlite.Open(":memory:") - if err != nil { - t.Fatalf("failed to open database: %v", err) - } - defer db.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := CreateTable(ctx, db); err != nil { - t.Fatal("failed to create table:", err) - } -} - -func TestInsertAndFindEvent(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - event := Event{ - Timestamp: time.Now().Unix(), - DataSource: "nvml", - GPUUUID: "31", - Reasons: []string{"GPU has fallen off the bus"}, - } - - // Test insertion - err := InsertEvent(ctx, db, event) - if err != nil { - t.Errorf("InsertEvent failed: %v", err) - } - - // Test finding the event - found, err := FindEvent(ctx, db, event) - if err != nil { - t.Errorf("FindEvent failed: %v", err) - } - if !found { - t.Error("expected to find event, but it wasn't found") - } - - // Test finding event with different details - eventDiffDetails := event - eventDiffDetails.Reasons = []string{"Different details"} - found, err = FindEvent(ctx, db, eventDiffDetails) - if err != nil { - t.Errorf("FindEvent with different details failed: %v", err) - } - if found { - t.Error("expected not to find event with different details") - } -} - -func TestReadEvents_NoRows(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // test ReadEvents with empty table - events, err := ReadEvents(ctx, db) - if err != nil { - t.Errorf("expected no error for empty table, got: %v", err) - } - if events != nil { - t.Errorf("expected nil events for empty table, got: %v", events) - } -} - -func TestReadEvents(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - baseTime := time.Now().Unix() - - // Insert test events - testEvents := []Event{ - { - Timestamp: baseTime, - DataSource: "nvml", - GPUUUID: "31", - Reasons: []string{"First event"}, - }, - { - Timestamp: baseTime + 1, - DataSource: "nvidia-smi", - GPUUUID: "32", - Reasons: []string{"Second event"}, - }, - { - Timestamp: baseTime + 2, - DataSource: "nvml", - GPUUUID: "33", - Reasons: []string{"Third event"}, - }, - } - - for _, event := range testEvents { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert test event: %v", err) - } - } - - // test reading all events - events, err := ReadEvents(ctx, db) - if err != nil { - t.Errorf("ReadEvents failed: %v", err) - } - if len(events) != len(testEvents) { - t.Errorf("expected %d events, got %d", len(testEvents), len(events)) - } - - // test reading events with limit - events, err = ReadEvents(ctx, db, WithLimit(2)) - if err != nil { - t.Errorf("ReadEvents with limit failed: %v", err) - } - if len(events) != 2 { - t.Errorf("expected 2 events with limit, got %d", len(events)) - } - - // test reading events since specific time - events, err = ReadEvents(ctx, db, WithSince(time.Unix(baseTime+1, 0))) - if err != nil { - t.Errorf("ReadEvents with since time failed: %v", err) - } - - t.Logf("searching for events since: %d", baseTime+1) - for _, e := range events { - t.Logf("Found event with timestamp: %d", e.Timestamp) - } - - if len(events) != 2 { - t.Errorf("expected 2 events since baseTime+1, got %d", len(events)) - } - - // Test reading events with ascending order - events, err = ReadEvents(ctx, db, WithSortUnixSecondsAscendingOrder()) - if err != nil { - t.Errorf("ReadEvents with ascending order failed: %v", err) - } - if len(events) != 3 || events[0].Timestamp > events[len(events)-1].Timestamp { - t.Error("Events not properly ordered in ascending order") - } -} - -func TestCreateDeleteStatementAndArgs(t *testing.T) { - tests := []struct { - name string - opts []OpOption - wantStatement string - wantArgs []any - wantErr bool - }{ - { - name: "no options", - opts: []OpOption{}, - wantStatement: fmt.Sprintf("DELETE FROM %s", - TableNameHWSlowdown, - ), - wantArgs: nil, - wantErr: false, - }, - { - name: "with before unix seconds and limit", - opts: []OpOption{ - WithBefore(time.Unix(1234, 0)), - WithLimit(10), - }, - wantStatement: fmt.Sprintf("DELETE FROM %s WHERE %s < ?", - TableNameHWSlowdown, - ColumnUnixSeconds, - ), - wantArgs: []any{int64(1234)}, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotStatement, gotArgs, err := createDeleteStatementAndArgs(tt.opts...) - - if tt.wantErr { - if err == nil { - t.Error("createDeleteStatementAndArgs() error = nil, wantErr = true") - } - return - } - - if err != nil { - t.Errorf("createDeleteStatementAndArgs() error = %v, wantErr = false", err) - return - } - - if gotStatement != tt.wantStatement { - t.Errorf("createDeleteStatementAndArgs() statement = %v, want %v", gotStatement, tt.wantStatement) - } - - if !reflect.DeepEqual(gotArgs, tt.wantArgs) { - t.Errorf("createDeleteStatementAndArgs() args = %v, want %v", gotArgs, tt.wantArgs) - } - }) - } -} - -func TestPurge(t *testing.T) { - t.Parallel() - - now := time.Now() - - tests := []struct { - name string - setup []Event - purgeOpts []OpOption - wantErr bool - wantPurged int - wantCount int - validate func(*testing.T, []Event) - }{ - { - name: "delete events before timestamp", - setup: []Event{ - {Timestamp: 1000, DataSource: "nvml", GPUUUID: "1", Reasons: []string{"detail1"}}, - {Timestamp: 2000, DataSource: "nvml", GPUUUID: "2", Reasons: []string{"detail2"}}, - {Timestamp: 3000, DataSource: "nvml", GPUUUID: "3", Reasons: []string{"detail3"}}, - }, - purgeOpts: []OpOption{WithBefore(time.Unix(2500, 0))}, - wantPurged: 2, - wantCount: 1, - validate: func(t *testing.T, events []Event) { - if len(events) == 0 || events[0].Timestamp != 3000 { - t.Errorf("expected event with timestamp 3000, got %+v", events) - } - }, - }, - { - name: "delete all events", - setup: []Event{ - {Timestamp: 1000, DataSource: "nvml", GPUUUID: "1", Reasons: []string{"detail1"}}, - {Timestamp: 2000, DataSource: "nvml", GPUUUID: "2", Reasons: []string{"detail2"}}, - }, - purgeOpts: []OpOption{}, - wantPurged: 2, - wantCount: 0, - }, - { - name: "delete events with large dataset", - setup: func() []Event { - events := make([]Event, 100) - baseTime := now.Unix() - for i := 0; i < 100; i++ { - events[i] = Event{ - Timestamp: baseTime + int64(i*60), // Events 1 minute apart - DataSource: "nvml", - GPUUUID: fmt.Sprintf("%d", i+1), - Reasons: []string{fmt.Sprintf("detail%d", i+1)}, - } - } - return events - }(), - purgeOpts: []OpOption{WithBefore(now.Add(30 * time.Minute))}, - wantPurged: 30, - wantCount: 70, - validate: func(t *testing.T, events []Event) { - if len(events) != 70 { - t.Errorf("expected 70 events, got %d", len(events)) - } - cutoff := now.Add(30 * time.Minute).Unix() - for _, e := range events { - if e.Timestamp < cutoff { - t.Errorf("found event with timestamp %d, which is before cutoff %d", - e.Timestamp, cutoff) - } - } - }, - }, - } - - for _, tt := range tests { - tt := tt // capture range variable - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // setup fresh database for each test - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Insert test data - for _, event := range tt.setup { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert test event: %v", err) - } - } - - // perform deletion - purged, err := Purge(ctx, db, tt.purgeOpts...) - if (err != nil) != tt.wantErr { - t.Errorf("DeleteEvents() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if purged != tt.wantPurged { - t.Errorf("DeleteEvents() purged = %v, want %v", purged, tt.wantPurged) - } - - // verify results - events, err := ReadEvents(ctx, db) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - if len(events) != tt.wantCount { - t.Errorf("expected %d events, got %d", tt.wantCount, len(events)) - } - - if tt.validate != nil { - tt.validate(t, events) - } - }) - } -} - -func setupTestDB(t *testing.T) (*sql.DB, func()) { - tmpfile, err := os.CreateTemp("", "test-nvidia-*.db") - if err != nil { - t.Fatalf("failed to create temp file: %v", err) - } - tmpfile.Close() - - db, err := sqlite.Open(tmpfile.Name()) - if err != nil { - os.Remove(tmpfile.Name()) - t.Fatalf("failed to open database: %v", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := CreateTable(ctx, db); err != nil { - db.Close() - os.Remove(tmpfile.Name()) - t.Fatalf("failed to create table: %v", err) - } - - cleanup := func() { - db.Close() - os.Remove(tmpfile.Name()) - } - return db, cleanup -} - -func TestDedupEvents(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Test case 1: Basic deduplication with nvml and nvidia-smi - // place nvidia-smi first but read will return nvml first - events := []Event{ - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup enabled - readEvents, err := ReadEvents(ctx, db, WithDedupDataSource(true)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should only get one event (the nvml one) - if len(readEvents) != 1 { - t.Errorf("expected 1 event after dedup, got %d", len(readEvents)) - } - - if len(readEvents) > 0 { - got := readEvents[0] - want := events[1] // The nvml event - if got.DataSource != want.DataSource { - t.Errorf("expected data source %s, got %s", want.DataSource, got.DataSource) - } - if !reflect.DeepEqual(got.Reasons, want.Reasons) { - t.Errorf("expected reasons %v, got %v", want.Reasons, got.Reasons) - } - } - - // Test case 2: Multiple events with different timestamps - events = []Event{ - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1001, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail2"}, - }, - } - - // Clear the table - if _, err := db.ExecContext(ctx, "DELETE FROM "+TableNameHWSlowdown); err != nil { - t.Fatalf("failed to clear table: %v", err) - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup enabled - readEvents, err = ReadEvents(ctx, db, WithDedupDataSource(true)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get both events since they have different timestamps - if len(readEvents) != 2 { - t.Errorf("expected 2 events for different timestamps, got %d", len(readEvents)) - } - - // Test case 3: Multiple GPUs at same timestamp - events = []Event{ - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "2", - Reasons: []string{"detail2"}, - }, - } - - // Clear the table - if _, err := db.ExecContext(ctx, "DELETE FROM "+TableNameHWSlowdown); err != nil { - t.Fatalf("failed to clear table: %v", err) - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup enabled - readEvents, err = ReadEvents(ctx, db, WithDedupDataSource(true)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get both events since they are for different GPUs - if len(readEvents) != 2 { - t.Errorf("expected 2 events for different GPUs, got %d", len(readEvents)) - } -} - -func TestDataSourceOrdering(t *testing.T) { - t.Parallel() - - db, cleanup := setupTestDB(t) - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // Test case 1: Same timestamp, same GPU - verify nvml comes before nvidia-smi - events := []Event{ - // Intentionally insert nvidia-smi first to verify ordering is by data source - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup disabled to verify ordering - readEvents, err := ReadEvents(ctx, db, WithDedupDataSource(false)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get both events with nvml first - if len(readEvents) != 2 { - t.Errorf("expected 2 events without dedup, got %d", len(readEvents)) - } - - if len(readEvents) == 2 { - // First event should be nvml - if readEvents[0].DataSource != "nvml" { - t.Errorf("expected first event to be from nvml, got %s", readEvents[0].DataSource) - } - // Second event should be nvidia-smi - if readEvents[1].DataSource != "nvidia-smi" { - t.Errorf("expected second event to be from nvidia-smi, got %s", readEvents[1].DataSource) - } - } - - // Test case 2: Multiple events with mixed timestamps - verify ordering within same timestamp - events = []Event{ - { - Timestamp: 1000, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1001, - DataSource: "nvidia-smi", - GPUUUID: "1", - Reasons: []string{"detail2"}, - }, - { - Timestamp: 1000, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail1"}, - }, - { - Timestamp: 1001, - DataSource: "nvml", - GPUUUID: "1", - Reasons: []string{"detail2"}, - }, - } - - // Clear the table - if _, err := db.ExecContext(ctx, "DELETE FROM "+TableNameHWSlowdown); err != nil { - t.Fatalf("failed to clear table: %v", err) - } - - // Insert events - for _, event := range events { - if err := InsertEvent(ctx, db, event); err != nil { - t.Fatalf("failed to insert event: %v", err) - } - } - - // Read events with dedup disabled - readEvents, err = ReadEvents(ctx, db, WithDedupDataSource(false)) - if err != nil { - t.Fatalf("failed to read events: %v", err) - } - - // Should get all 4 events - if len(readEvents) != 4 { - t.Errorf("expected 4 events without dedup, got %d", len(readEvents)) - } - - if len(readEvents) == 4 { - // For each unique timestamp, nvml should come before nvidia-smi - // Events should be ordered by timestamp DESC, then data source DESC - expectedOrder := []struct { - unixSeconds int64 - dataSource string - }{ - {1001, "nvml"}, - {1001, "nvidia-smi"}, - {1000, "nvml"}, - {1000, "nvidia-smi"}, - } - - for i, expected := range expectedOrder { - got := readEvents[i] - if got.Timestamp != expected.unixSeconds || got.DataSource != expected.dataSource { - t.Errorf("event[%d]: expected {unix: %d, source: %s}, got {unix: %d, source: %s}", - i, expected.unixSeconds, expected.dataSource, got.Timestamp, got.DataSource) - } - } - } -} diff --git a/components/accelerator/nvidia/query/metrics/clock/metrics.go b/components/accelerator/nvidia/query/metrics/clock/metrics.go index b9db1971..b2826397 100644 --- a/components/accelerator/nvidia/query/metrics/clock/metrics.go +++ b/components/accelerator/nvidia/query/metrics/clock/metrics.go @@ -4,6 +4,7 @@ package clock import ( "context" "database/sql" + "sync" "time" components_metrics "github.com/leptonai/gpud/components/metrics" @@ -15,6 +16,8 @@ import ( const SubSystem = "accelerator_nvidia_clock" var ( + initOnce sync.Once + lastUpdateUnixSeconds = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "", @@ -59,9 +62,11 @@ var ( ) func InitAveragers(dbRW *sql.DB, dbRO *sql.DB, tableName string) { - hwSlowdownAverager = components_metrics.NewAverager(dbRW, dbRO, tableName, SubSystem+"_hw_slowdown") - hwSlowdownThermalAverager = components_metrics.NewAverager(dbRW, dbRO, tableName, SubSystem+"_hw_slowdown_thermal") - hwSlowdownPowerBrakeAverager = components_metrics.NewAverager(dbRW, dbRO, tableName, SubSystem+"_hw_slowdown_power_brake") + initOnce.Do(func() { + hwSlowdownAverager = components_metrics.NewAverager(dbRW, dbRO, tableName, SubSystem+"_hw_slowdown") + hwSlowdownThermalAverager = components_metrics.NewAverager(dbRW, dbRO, tableName, SubSystem+"_hw_slowdown_thermal") + hwSlowdownPowerBrakeAverager = components_metrics.NewAverager(dbRW, dbRO, tableName, SubSystem+"_hw_slowdown_power_brake") + }) } func ReadHWSlowdown(ctx context.Context, since time.Time) (components_metrics_state.Metrics, error) { diff --git a/components/accelerator/nvidia/query/nvidia_smi_query.go b/components/accelerator/nvidia/query/nvidia_smi_query.go index cb09a6fe..de46d9e9 100644 --- a/components/accelerator/nvidia/query/nvidia_smi_query.go +++ b/components/accelerator/nvidia/query/nvidia_smi_query.go @@ -10,11 +10,14 @@ import ( "sort" "strings" "sync" + "time" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" "github.com/leptonai/gpud/log" "github.com/leptonai/gpud/pkg/file" "github.com/leptonai/gpud/pkg/process" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" ) @@ -467,8 +470,10 @@ func (o *SMIOutput) FindHWSlowdownErrs() []string { return errs } -func (o *SMIOutput) HWSlowdownEvents(unixSeconds int64) []nvidia_hw_slowdown_state.Event { - var events []nvidia_hw_slowdown_state.Event +func (o *SMIOutput) HWSlowdownEvents(unixSeconds int64) []components.Event { + var resultEvents []components.Event + eventTime := time.Unix(unixSeconds, 0).UTC() + for _, g := range o.GPUs { if g.ClockEventReasons == nil { continue @@ -480,12 +485,36 @@ func (o *SMIOutput) HWSlowdownEvents(unixSeconds int64) []nvidia_hw_slowdown_sta } sort.Strings(hwSlowdownErrs) - events = append(events, nvidia_hw_slowdown_state.Event{ - Timestamp: unixSeconds, - DataSource: "nvidia-smi", - GPUUUID: g.ID, - Reasons: hwSlowdownErrs, - }) + if event := createHWSlowdownEventFromNvidiaSMI( + eventTime, + g.ID, + hwSlowdownErrs, + ); event != nil { + resultEvents = append(resultEvents, *event) + } + } + return resultEvents +} + +// createHWSlowdownEventFromNvidiaSMI creates a components.Event from nvidia-smi hardware slowdown reasons. +// Returns nil if there are no hardware slowdown reasons. +func createHWSlowdownEventFromNvidiaSMI( + eventTime time.Time, + gpuUUID string, + slowdownReasons []string, +) *components.Event { + if len(slowdownReasons) == 0 { + return nil + } + + return &components.Event{ + Time: metav1.Time{Time: eventTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: strings.Join(slowdownReasons, ", "), + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": gpuUUID, + }, } - return events } diff --git a/components/accelerator/nvidia/query/nvidia_smi_query_test.go b/components/accelerator/nvidia/query/nvidia_smi_query_test.go index 16a38f70..003cc70a 100644 --- a/components/accelerator/nvidia/query/nvidia_smi_query_test.go +++ b/components/accelerator/nvidia/query/nvidia_smi_query_test.go @@ -8,6 +8,11 @@ import ( "reflect" "testing" "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" ) func TestGetSMIOutput(t *testing.T) { @@ -500,3 +505,151 @@ func TestParseWithAddressingModeError(t *testing.T) { } } } + +func TestCreateHWSlowdownEventFromNvidiaSMI(t *testing.T) { + testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + eventTime time.Time + gpuUUID string + slowdownReasons []string + want *components.Event + }{ + { + name: "no slowdown reasons", + eventTime: testTime, + gpuUUID: "GPU-123", + slowdownReasons: []string{}, + want: nil, + }, + { + name: "nil slowdown reasons", + eventTime: testTime, + gpuUUID: "GPU-123", + slowdownReasons: nil, + want: nil, + }, + { + name: "single slowdown reason", + eventTime: testTime, + gpuUUID: "GPU-5678", + slowdownReasons: []string{ + "HW Slowdown is engaged", + }, + want: &components.Event{ + Time: metav1.Time{Time: testTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown is engaged", + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": "GPU-5678", + }, + }, + }, + { + name: "multiple slowdown reasons", + eventTime: testTime, + gpuUUID: "GPU-ABCD", + slowdownReasons: []string{ + "HW Power Brake Slowdown", + "HW Slowdown is engaged", + "HW Thermal Slowdown", + }, + want: &components.Event{ + Time: metav1.Time{Time: testTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Power Brake Slowdown, HW Slowdown is engaged, HW Thermal Slowdown", + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": "GPU-ABCD", + }, + }, + }, + { + name: "empty gpu uuid", + eventTime: testTime, + gpuUUID: "", + slowdownReasons: []string{ + "HW Slowdown is engaged", + }, + want: &components.Event{ + Time: metav1.Time{Time: testTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown is engaged", + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": "", + }, + }, + }, + { + name: "zero time", + eventTime: time.Time{}, + gpuUUID: "GPU-ZERO", + slowdownReasons: []string{ + "HW Slowdown is engaged", + }, + want: &components.Event{ + Time: metav1.Time{Time: time.Time{}}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown is engaged", + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": "GPU-ZERO", + }, + }, + }, + { + name: "real nvidia-smi output format", + eventTime: testTime, + gpuUUID: "GPU-00000000:01:00.0", + slowdownReasons: []string{ + "GPU-00000000:01:00.0: ClockEventReasons.HWSlowdown.ThermalSlowdown Active", + "GPU-00000000:01:00.0: ClockEventReasons.HWSlowdown.PowerBrakeSlowdown Active", + }, + want: &components.Event{ + Time: metav1.Time{Time: testTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "GPU-00000000:01:00.0: ClockEventReasons.HWSlowdown.ThermalSlowdown Active, " + + "GPU-00000000:01:00.0: ClockEventReasons.HWSlowdown.PowerBrakeSlowdown Active", + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": "GPU-00000000:01:00.0", + }, + }, + }, + { + name: "slowdown reason with special characters", + eventTime: testTime, + gpuUUID: "GPU-SPECIAL", + slowdownReasons: []string{ + "HW Slowdown (temp: 95°C, power: 350W)", + }, + want: &components.Event{ + Time: metav1.Time{Time: testTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "HW Slowdown (temp: 95°C, power: 350W)", + ExtraInfo: map[string]string{ + "data_source": "nvidia-smi", + "gpu_uuid": "GPU-SPECIAL", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := createHWSlowdownEventFromNvidiaSMI(tt.eventTime, tt.gpuUUID, tt.slowdownReasons) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("createHWSlowdownEventFromNvidiaSMI() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/components/accelerator/nvidia/query/nvml/clock_events.go b/components/accelerator/nvidia/query/nvml/clock_events.go index aae606be..ea7d2ce1 100644 --- a/components/accelerator/nvidia/query/nvml/clock_events.go +++ b/components/accelerator/nvidia/query/nvml/clock_events.go @@ -4,14 +4,17 @@ import ( "encoding/json" "fmt" "sort" + "strings" "time" - "github.com/leptonai/gpud/log" - "github.com/NVIDIA/go-nvlib/pkg/nvlib/device" "github.com/NVIDIA/go-nvml/pkg/nvml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" + + "github.com/leptonai/gpud/components" + "github.com/leptonai/gpud/components/common" + "github.com/leptonai/gpud/log" ) // Returns true if clock events is supported by all devices. @@ -262,3 +265,22 @@ func (inst *instance) ClockEventsSupported() bool { return inst.clockEventsSupported } + +// createEventFromClockEvents creates a components.Event from ClockEvents if there are hardware slowdown reasons. +// Returns nil if there are no hardware slowdown reasons. +func createEventFromClockEvents(clockEvents ClockEvents) *components.Event { + if len(clockEvents.HWSlowdownReasons) == 0 { + return nil + } + + return &components.Event{ + Time: clockEvents.Time, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: strings.Join(clockEvents.HWSlowdownReasons, ", "), + ExtraInfo: map[string]string{ + "data_source": "nvml", + "gpu_uuid": clockEvents.UUID, + }, + } +} diff --git a/components/accelerator/nvidia/query/nvml/clock_events_test.go b/components/accelerator/nvidia/query/nvml/clock_events_test.go index c31ae815..21da7e30 100644 --- a/components/accelerator/nvidia/query/nvml/clock_events_test.go +++ b/components/accelerator/nvidia/query/nvml/clock_events_test.go @@ -1,11 +1,21 @@ package nvml import ( + "fmt" + "os" + "reflect" "testing" + "time" + "github.com/NVIDIA/go-nvlib/pkg/nvlib/device" "github.com/NVIDIA/go-nvml/pkg/nvml" "github.com/NVIDIA/go-nvml/pkg/nvml/mock" + "github.com/leptonai/gpud/components" "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml/testutil" + "github.com/leptonai/gpud/components/common" + mocknvml "github.com/leptonai/gpud/e2e/mock/nvml" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestGetClockEventReasons(t *testing.T) { @@ -231,3 +241,340 @@ func TestGetClockEvents(t *testing.T) { }) } } + +func TestClockEventsSupported(t *testing.T) { + tests := []struct { + name string + mockDevices []*mock.Device + mockDeviceErr error + expectedResult bool + expectError bool + }{ + { + name: "all devices support clock events", + mockDevices: []*mock.Device{ + { + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.SUCCESS + }, + GetNameFunc: func() (string, nvml.Return) { + return "Tesla V100", nvml.SUCCESS + }, + GetUUIDFunc: func() (string, nvml.Return) { + return "GPU-1234", nvml.SUCCESS + }, + GetMinorNumberFunc: func() (int, nvml.Return) { + return 0, nvml.SUCCESS + }, + }, + { + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.SUCCESS + }, + GetNameFunc: func() (string, nvml.Return) { + return "Tesla V100", nvml.SUCCESS + }, + GetUUIDFunc: func() (string, nvml.Return) { + return "GPU-5678", nvml.SUCCESS + }, + GetMinorNumberFunc: func() (int, nvml.Return) { + return 1, nvml.SUCCESS + }, + }, + }, + expectedResult: true, + expectError: false, + }, + { + name: "one device supports clock events", + mockDevices: []*mock.Device{ + { + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.ERROR_NOT_SUPPORTED + }, + GetNameFunc: func() (string, nvml.Return) { + return "Tesla V100", nvml.SUCCESS + }, + GetUUIDFunc: func() (string, nvml.Return) { + return "GPU-1234", nvml.SUCCESS + }, + GetMinorNumberFunc: func() (int, nvml.Return) { + return 0, nvml.SUCCESS + }, + }, + { + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.SUCCESS + }, + GetNameFunc: func() (string, nvml.Return) { + return "Tesla V100", nvml.SUCCESS + }, + GetUUIDFunc: func() (string, nvml.Return) { + return "GPU-5678", nvml.SUCCESS + }, + GetMinorNumberFunc: func() (int, nvml.Return) { + return 1, nvml.SUCCESS + }, + }, + }, + expectedResult: true, + expectError: false, + }, + { + name: "no devices support clock events", + mockDevices: []*mock.Device{ + { + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.ERROR_NOT_SUPPORTED + }, + GetNameFunc: func() (string, nvml.Return) { + return "Tesla V100", nvml.SUCCESS + }, + GetUUIDFunc: func() (string, nvml.Return) { + return "GPU-1234", nvml.SUCCESS + }, + GetMinorNumberFunc: func() (int, nvml.Return) { + return 0, nvml.SUCCESS + }, + }, + }, + expectedResult: false, + expectError: false, + }, + { + name: "error getting devices", + mockDeviceErr: fmt.Errorf("failed to get devices"), + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock devices + mockDevices := make([]device.Device, len(tt.mockDevices)) + for i, d := range tt.mockDevices { + mockDevices[i] = testutil.CreateDevice(d) + } + + // Mock NVML + mockNVML := &mock.Interface{ + InitFunc: func() nvml.Return { + return nvml.SUCCESS + }, + DeviceGetCountFunc: func() (int, nvml.Return) { + if tt.mockDeviceErr != nil { + return 0, nvml.ERROR_UNKNOWN + } + return len(mockDevices), nvml.SUCCESS + }, + DeviceGetHandleByIndexFunc: func(idx int) (nvml.Device, nvml.Return) { + if tt.mockDeviceErr != nil { + return nil, nvml.ERROR_UNKNOWN + } + if idx < 0 || idx >= len(tt.mockDevices) { + return nil, nvml.ERROR_INVALID_ARGUMENT + } + return tt.mockDevices[idx], nvml.SUCCESS + }, + } + + // Set up mock NVML environment + err := os.Setenv(mocknvml.EnvNVMLMock, "true") + if err != nil { + t.Fatalf("failed to set mock NVML environment: %v", err) + } + defer os.Unsetenv(mocknvml.EnvNVMLMock) + + // Replace the mock instance + originalMockInstance := mocknvml.MockInstance + mocknvml.MockInstance = mockNVML + defer func() { mocknvml.MockInstance = originalMockInstance }() + + result, err := ClockEventsSupported() + if tt.expectError { + if err == nil { + t.Error("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if result != tt.expectedResult { + t.Errorf("ClockEventsSupported() = %v, want %v", result, tt.expectedResult) + } + }) + } +} + +func TestClockEventsSupportedByDevice(t *testing.T) { + tests := []struct { + name string + mockDevice *mock.Device + expectedResult bool + expectError bool + }{ + { + name: "device supports clock events", + mockDevice: &mock.Device{ + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.SUCCESS + }, + }, + expectedResult: true, + expectError: false, + }, + { + name: "device does not support clock events", + mockDevice: &mock.Device{ + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.ERROR_NOT_SUPPORTED + }, + }, + expectedResult: false, + expectError: false, + }, + { + name: "device returns error", + mockDevice: &mock.Device{ + GetCurrentClocksEventReasonsFunc: func() (uint64, nvml.Return) { + return 0, nvml.ERROR_UNKNOWN + }, + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := ClockEventsSupportedByDevice(testutil.CreateDevice(tt.mockDevice)) + if tt.expectError { + if err == nil { + t.Error("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if result != tt.expectedResult { + t.Errorf("ClockEventsSupportedByDevice() = %v, want %v", result, tt.expectedResult) + } + }) + } +} + +func TestClockEventsJSONAndYAML(t *testing.T) { + testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + tests := []struct { + name string + clockEvents *ClockEvents + wantJSON string + wantYAML string + }{ + { + name: "valid clock events", + clockEvents: &ClockEvents{ + Time: metav1.Time{Time: testTime}, + UUID: "GPU-123", + ReasonsBitmask: reasonHWSlowdown, + HWSlowdownReasons: []string{"test reason"}, + HWSlowdown: true, + Supported: true, + }, + wantJSON: `{"time":"2024-01-01T00:00:00Z","uuid":"GPU-123","reasons_bitmask":8,"hw_slowdown_reasons":["test reason"],"hw_slowdown":true,"hw_thermal_slowdown":false,"hw_slowdown_power_brake":false,"supported":true}`, + wantYAML: `hw_slowdown: true +hw_slowdown_power_brake: false +hw_slowdown_reasons: +- test reason +hw_thermal_slowdown: false +reasons_bitmask: 8 +supported: true +time: "2024-01-01T00:00:00Z" +uuid: GPU-123 +`, + }, + { + name: "nil clock events", + clockEvents: nil, + wantJSON: "", + wantYAML: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test JSON marshaling + gotJSON, err := tt.clockEvents.JSON() + if err != nil { + t.Errorf("ClockEvents.JSON() error = %v", err) + return + } + if string(gotJSON) != tt.wantJSON { + t.Errorf("ClockEvents.JSON() = %v, want %v", string(gotJSON), tt.wantJSON) + } + + // Test YAML marshaling + gotYAML, err := tt.clockEvents.YAML() + if err != nil { + t.Errorf("ClockEvents.YAML() error = %v", err) + return + } + if string(gotYAML) != tt.wantYAML { + t.Errorf("ClockEvents.YAML() = %v, want %v", string(gotYAML), tt.wantYAML) + } + }) + } +} + +func TestCreateEventFromClockEvents(t *testing.T) { + testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + clockEvents ClockEvents + want *components.Event + }{ + { + name: "no hardware slowdown reasons", + clockEvents: ClockEvents{ + Time: metav1.Time{Time: testTime}, + UUID: "GPU-123", + }, + want: nil, + }, + { + name: "with hardware slowdown reasons", + clockEvents: ClockEvents{ + Time: metav1.Time{Time: testTime}, + UUID: "GPU-123", + HWSlowdownReasons: []string{"reason1", "reason2"}, + }, + want: &components.Event{ + Time: metav1.Time{Time: testTime}, + Name: "hw_slowdown", + Type: common.EventTypeWarning, + Message: "reason1, reason2", + ExtraInfo: map[string]string{ + "data_source": "nvml", + "gpu_uuid": "GPU-123", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := createEventFromClockEvents(tt.clockEvents) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("createEventFromClockEvents() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/components/accelerator/nvidia/query/nvml/nvml.go b/components/accelerator/nvidia/query/nvml/nvml.go index a3839ad1..c40d387f 100644 --- a/components/accelerator/nvidia/query/nvml/nvml.go +++ b/components/accelerator/nvidia/query/nvml/nvml.go @@ -16,8 +16,8 @@ import ( "github.com/NVIDIA/go-nvml/pkg/nvml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" + events_db "github.com/leptonai/gpud/components/db" mocknvml "github.com/leptonai/gpud/e2e/mock/nvml" "github.com/leptonai/gpud/log" ) @@ -88,6 +88,9 @@ type instance struct { // read-only database instance dbRO *sql.DB + xidEventsStore events_db.Store + hwslowdownEventsStore events_db.Store + clockEventsSupported bool xidErrorSupported bool @@ -257,6 +260,9 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { dbRW: op.dbRW, dbRO: op.dbRO, + xidEventsStore: op.xidEventsStore, + hwslowdownEventsStore: op.hwslowdownEventsStore, + clockEventsSupported: clockEventsSupported, xidErrorSupported: false, @@ -504,28 +510,21 @@ func (inst *instance) Get() (*Output, error) { latestInfo.ClockEvents = &clockEvents - ev := nvidia_hw_slowdown_state.Event{ - Timestamp: clockEvents.Time.Unix(), - DataSource: "nvml", - GPUUUID: devInfo.UUID, - Reasons: clockEvents.HWSlowdownReasons, - } - - cctx, ccancel := context.WithTimeout(context.Background(), 15*time.Second) - found, err := nvidia_hw_slowdown_state.FindEvent(cctx, inst.dbRO, ev) - ccancel() - if err != nil { - log.Logger.Warnw("failed to find clock events from db", "error", err, "gpu_uuid", devInfo.UUID) - joinedErrs = append(joinedErrs, fmt.Errorf("failed to find clock events: %w (GPU uuid %s)", err, devInfo.UUID)) - } else if !found { - log.Logger.Warnw("detected hw slowdown clock events", "hwSlowdownReasons", clockEvents.HWSlowdownReasons) - - cctx, ccancel = context.WithTimeout(context.Background(), 15*time.Second) - err = nvidia_hw_slowdown_state.InsertEvent(cctx, inst.dbRW, ev) + ev := createEventFromClockEvents(clockEvents) + if ev != nil { + cctx, ccancel := context.WithTimeout(context.Background(), 15*time.Second) + found, err := inst.hwslowdownEventsStore.Find(cctx, *ev) ccancel() if err != nil { - log.Logger.Warnw("failed to insert clock events to db", "error", err, "gpu_uuid", devInfo.UUID) - joinedErrs = append(joinedErrs, fmt.Errorf("failed to insert clock events: %w (GPU uuid %s)", err, devInfo.UUID)) + log.Logger.Warnw("failed to find clock events from db", "error", err, "gpu_uuid", devInfo.UUID) + joinedErrs = append(joinedErrs, fmt.Errorf("failed to find clock events: %w (GPU uuid %s)", err, devInfo.UUID)) + } else if found != nil { + cctx, ccancel = context.WithTimeout(context.Background(), 15*time.Second) + err = inst.hwslowdownEventsStore.Insert(cctx, *ev) + ccancel() + if err != nil { + log.Logger.Warnw("failed to insert clock events to db", "error", err, "gpu_uuid", devInfo.UUID) + } } } } diff --git a/components/accelerator/nvidia/query/query.go b/components/accelerator/nvidia/query/query.go index 119defd0..0df022d0 100644 --- a/components/accelerator/nvidia/query/query.go +++ b/components/accelerator/nvidia/query/query.go @@ -11,7 +11,6 @@ import ( "sync" "time" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" "github.com/leptonai/gpud/components/accelerator/nvidia/query/infiniband" metrics_clock "github.com/leptonai/gpud/components/accelerator/nvidia/query/metrics/clock" metrics_clockspeed "github.com/leptonai/gpud/components/accelerator/nvidia/query/metrics/clock-speed" @@ -287,23 +286,23 @@ func Get(ctx context.Context, opts ...OpOption) (output any, err error) { events := o.SMI.HWSlowdownEvents(truncNowUTC.Unix()) for _, event := range events { cctx, ccancel = context.WithTimeout(ctx, time.Minute) - found, err := nvidia_hw_slowdown_state.FindEvent(cctx, op.dbRO, event) + found, err := op.hwslowdownEventsStore.Find(cctx, event) ccancel() if err != nil { - log.Logger.Warnw("failed to find clock events from db", "error", err, "gpu_uuid", event.GPUUUID) + log.Logger.Warnw("failed to find clock events from db", "error", err, "info", event.ExtraInfo) o.SMIQueryErrors = append(o.SMIQueryErrors, fmt.Sprintf("failed to find clock events: %v", err)) continue } - if found { + if found != nil { continue } - log.Logger.Warnw("detected hw slowdown clock events", "gpu_uuid", event.GPUUUID) + log.Logger.Warnw("detected hw slowdown clock events", "info", event.ExtraInfo) cctx, ccancel = context.WithTimeout(ctx, time.Minute) - err = nvidia_hw_slowdown_state.InsertEvent(cctx, op.dbRW, event) + err = op.hwslowdownEventsStore.Insert(cctx, event) ccancel() if err != nil { - log.Logger.Warnw("failed to insert clock events to db", "error", err, "gpu_uuid", event.GPUUUID) + log.Logger.Warnw("failed to insert clock events to db", "error", err, "info", event.ExtraInfo) o.SMIQueryErrors = append(o.SMIQueryErrors, fmt.Sprintf("failed to persist clock events: %v", err)) } } diff --git a/components/diagnose/scan.go b/components/diagnose/scan.go index 561c7805..1fc329c6 100644 --- a/components/diagnose/scan.go +++ b/components/diagnose/scan.go @@ -10,7 +10,6 @@ import ( nvidia_component_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id" nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query_nvml "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml" @@ -148,13 +147,6 @@ func Scan(ctx context.Context, opts ...OpOption) error { log.Logger.Fatalw("failed to create events store", "error", err) } - // "nvidia_query.Get" assumes that the "clock-events-state" table exists - // pre-create since this is a one-off operation - // TODO: move these into a single place - if err := nvidia_hw_slowdown_state.CreateTable(ctx, db); err != nil { - log.Logger.Fatalw("failed to create clock events state table", "error", err) - } - outputRaw, err := nvidia_query.Get( ctx, nvidia_query.WithDBRW(db), // to deprecate in favor of events store diff --git a/internal/server/server.go b/internal/server/server.go index e425b2aa..00facaa0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -51,7 +51,6 @@ import ( nvidia_gsp_firmware_mode_id "github.com/leptonai/gpud/components/accelerator/nvidia/gsp-firmware-mode/id" nvidia_hw_slowdown "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown" nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id" - nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state" nvidia_infiniband "github.com/leptonai/gpud/components/accelerator/nvidia/infiniband" nvidia_infiniband_id "github.com/leptonai/gpud/components/accelerator/nvidia/infiniband/id" nvidia_info "github.com/leptonai/gpud/components/accelerator/nvidia/info" @@ -318,29 +317,6 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID } }() - if err := nvidia_hw_slowdown_state.CreateTable(ctx, dbRW); err != nil { - return nil, fmt.Errorf("failed to create nvidia clock events table: %w", err) - } - go func() { - dur := nvidia_hw_slowdown_state.DefaultRetentionPeriod - for { - select { - case <-ctx.Done(): - return - case <-time.After(dur): - now := time.Now().UTC() - before := now.Add(-dur) - - purged, err := nvidia_hw_slowdown_state.Purge(ctx, dbRW, nvidia_hw_slowdown_state.WithBefore(before)) - if err != nil { - log.Logger.Warnw("failed to delete nvidia clock events", "error", err) - } else { - log.Logger.Debugw("deleted nvidia clock events", "before", before, "purged", purged) - } - } - } - }() - if err := pci_state.CreateTable(ctx, dbRW); err != nil { return nil, fmt.Errorf("failed to create pci state table: %w", err) } @@ -867,7 +843,7 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("failed to validate component %s config: %w", k, err) } - c, err := nvidia_hw_slowdown.New(ctx, cfg) + c, err := nvidia_hw_slowdown.New(ctx, cfg, eventsStoreNvidiaHWSlowdown) if err != nil { return nil, fmt.Errorf("failed to create component %s: %w", k, err) } diff --git a/pkg/dmesg/watcher_test.go b/pkg/dmesg/watcher_test.go index 739abfb8..f6621f6a 100644 --- a/pkg/dmesg/watcher_test.go +++ b/pkg/dmesg/watcher_test.go @@ -7,6 +7,8 @@ import ( "strings" "testing" "time" + + "github.com/leptonai/gpud/pkg/process" ) func TestWatch(t *testing.T) { @@ -436,3 +438,42 @@ func TestWatchWithError(t *testing.T) { t.Error("expected to see an error line") } } + +func TestReadContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan LogLine, 1000) + + // Create a command that will run for a while + p, err := process.New(process.WithCommand("sleep", "10")) + if err != nil { + t.Fatalf("failed to create process: %v", err) + } + if err := p.Start(ctx); err != nil { + t.Fatalf("failed to start process: %v", err) + } + + // Start reading in a goroutine + go read(ctx, p, ch) + + // Give it a moment to start + time.Sleep(time.Second) + + // Cancel the context + cancel() + + // Wait for the channel to close + timer := time.NewTimer(1 * time.Second) + select { + case _, ok := <-ch: + if !ok { + // Channel closed as expected + return + } + + // just log for slow CI + t.Log("channel should have been closed after context cancellation") + + case <-timer.C: + t.Error("timeout waiting for channel to close after context cancellation") + } +}