Skip to content

Commit

Permalink
feat(nvidia/query): pass events store to shared poller
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 30, 2025
1 parent c925761 commit 82f693d
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 9 deletions.
22 changes: 19 additions & 3 deletions components/accelerator/nvidia/query/nvml/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"database/sql"

"github.com/NVIDIA/go-nvml/pkg/nvml"

events_db "github.com/leptonai/gpud/components/db"
"github.com/leptonai/gpud/pkg/sqlite"
)

type Op struct {
dbRW *sql.DB
dbRO *sql.DB
gpmMetricsIDs map[nvml.GpmMetricId]struct{}
dbRW *sql.DB
dbRO *sql.DB
xidEventsStore events_db.Store
hwslowdownEventsStore events_db.Store
gpmMetricsIDs map[nvml.GpmMetricId]struct{}
}

type OpOption func(*Op)
Expand Down Expand Up @@ -53,6 +57,18 @@ func WithDBRO(db *sql.DB) OpOption {
}
}

func WithXidEventsStore(store events_db.Store) OpOption {
return func(op *Op) {
op.xidEventsStore = store
}
}

func WithHWSlowdownEventsStore(store events_db.Store) OpOption {
return func(op *Op) {
op.hwslowdownEventsStore = store
}
}

func WithGPMMetricsID(ids ...nvml.GpmMetricId) OpOption {
return func(op *Op) {
if op.gpmMetricsIDs == nil {
Expand Down
105 changes: 105 additions & 0 deletions components/accelerator/nvidia/query/nvml/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package nvml

import (
"database/sql"
"testing"

"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

events_db "github.com/leptonai/gpud/components/db"
)

func TestOpOptions(t *testing.T) {
t.Run("default values", func(t *testing.T) {
op := &Op{}
err := op.applyOpts([]OpOption{})
require.NoError(t, err)

// Check that default in-memory databases are created
assert.NotNil(t, op.dbRW)
assert.NotNil(t, op.dbRO)
assert.Nil(t, op.xidEventsStore)
assert.Nil(t, op.hwslowdownEventsStore)
assert.Nil(t, op.gpmMetricsIDs)
})

t.Run("custom values", func(t *testing.T) {
mockDB := &sql.DB{}
mockStore := &mockEventsStore{}
testMetrics := []nvml.GpmMetricId{
nvml.GPM_METRIC_SM_OCCUPANCY,
nvml.GPM_METRIC_FP32_UTIL,
}

op := &Op{}
err := op.applyOpts([]OpOption{
WithDBRW(mockDB),
WithDBRO(mockDB),
WithXidEventsStore(mockStore),
WithHWSlowdownEventsStore(mockStore),
WithGPMMetricsID(testMetrics...),
})
require.NoError(t, err)

// Check custom values
assert.Equal(t, mockDB, op.dbRW)
assert.Equal(t, mockDB, op.dbRO)
assert.Equal(t, mockStore, op.xidEventsStore)
assert.Equal(t, mockStore, op.hwslowdownEventsStore)

// Check GPM metrics
assert.NotNil(t, op.gpmMetricsIDs)
assert.Len(t, op.gpmMetricsIDs, len(testMetrics))
for _, metric := range testMetrics {
_, exists := op.gpmMetricsIDs[metric]
assert.True(t, exists, "Metric %v should exist in gpmMetricsIDs", metric)
}
})

t.Run("partial options", func(t *testing.T) {
mockDB := &sql.DB{}
testMetrics := []nvml.GpmMetricId{nvml.GPM_METRIC_SM_OCCUPANCY}

op := &Op{}
err := op.applyOpts([]OpOption{
WithDBRW(mockDB),
WithGPMMetricsID(testMetrics...),
})
require.NoError(t, err)

// Check mixed custom and default values
assert.Equal(t, mockDB, op.dbRW)
assert.NotNil(t, op.dbRO) // Should create default read-only DB
assert.Nil(t, op.xidEventsStore)
assert.Nil(t, op.hwslowdownEventsStore)

// Check GPM metrics
assert.NotNil(t, op.gpmMetricsIDs)
assert.Len(t, op.gpmMetricsIDs, 1)
_, exists := op.gpmMetricsIDs[nvml.GPM_METRIC_SM_OCCUPANCY]
assert.True(t, exists)
})

t.Run("multiple GPM metrics", func(t *testing.T) {
op := &Op{}
// Add metrics in multiple calls to test accumulation
err := op.applyOpts([]OpOption{
WithGPMMetricsID(nvml.GPM_METRIC_SM_OCCUPANCY),
WithGPMMetricsID(nvml.GPM_METRIC_FP32_UTIL),
})
require.NoError(t, err)

assert.Len(t, op.gpmMetricsIDs, 2)
_, exists := op.gpmMetricsIDs[nvml.GPM_METRIC_SM_OCCUPANCY]
assert.True(t, exists)
_, exists = op.gpmMetricsIDs[nvml.GPM_METRIC_FP32_UTIL]
assert.True(t, exists)
})
}

// mockEventsStore implements events_db.Store interface for testing
type mockEventsStore struct {
events_db.Store
}
20 changes: 19 additions & 1 deletion components/accelerator/nvidia/query/options.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package query

import "database/sql"
import (
"database/sql"

events_db "github.com/leptonai/gpud/components/db"
)

type Op struct {
dbRW *sql.DB
dbRO *sql.DB
xidEventsStore events_db.Store
hwslowdownEventsStore events_db.Store
nvidiaSMICommand string
nvidiaSMIQueryCommand string
ibstatCommand string
Expand Down Expand Up @@ -47,6 +53,18 @@ func WithDBRO(db *sql.DB) OpOption {
}
}

func WithXidEventsStore(store events_db.Store) OpOption {
return func(op *Op) {
op.xidEventsStore = store
}
}

func WithHWSlowdownEventsStore(store events_db.Store) OpOption {
return func(op *Op) {
op.hwslowdownEventsStore = store
}
}

// Specifies the nvidia-smi binary path to overwrite the default path.
func WithNvidiaSMICommand(p string) OpOption {
return func(op *Op) {
Expand Down
77 changes: 77 additions & 0 deletions components/accelerator/nvidia/query/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package query

import (
"database/sql"
"testing"

events_db "github.com/leptonai/gpud/components/db"
"github.com/stretchr/testify/assert"
)

func TestOpOptions(t *testing.T) {
t.Run("default values", func(t *testing.T) {
op := &Op{}
err := op.applyOpts([]OpOption{})
assert.NoError(t, err)

// Check default values
assert.Equal(t, "nvidia-smi", op.nvidiaSMICommand)
assert.Equal(t, "nvidia-smi --query", op.nvidiaSMIQueryCommand)
assert.Equal(t, "ibstat", op.ibstatCommand)
assert.Equal(t, "/sys/class/infiniband", op.infinibandClassDirectory)
assert.False(t, op.debug)
})

t.Run("custom values", func(t *testing.T) {
mockDB := &sql.DB{}
mockStore := &mockEventsStore{}

op := &Op{}
err := op.applyOpts([]OpOption{
WithDBRW(mockDB),
WithDBRO(mockDB),
WithXidEventsStore(mockStore),
WithHWSlowdownEventsStore(mockStore),
WithNvidiaSMICommand("/custom/nvidia-smi"),
WithNvidiaSMIQueryCommand("/custom/nvidia-smi-query"),
WithIbstatCommand("/custom/ibstat"),
WithInfinibandClassDirectory("/custom/infiniband"),
WithDebug(true),
})

assert.NoError(t, err)

// Check custom values
assert.Equal(t, mockDB, op.dbRW)
assert.Equal(t, mockDB, op.dbRO)
assert.Equal(t, mockStore, op.xidEventsStore)
assert.Equal(t, mockStore, op.hwslowdownEventsStore)
assert.Equal(t, "/custom/nvidia-smi", op.nvidiaSMICommand)
assert.Equal(t, "/custom/nvidia-smi-query", op.nvidiaSMIQueryCommand)
assert.Equal(t, "/custom/ibstat", op.ibstatCommand)
assert.Equal(t, "/custom/infiniband", op.infinibandClassDirectory)
assert.True(t, op.debug)
})

t.Run("partial options", func(t *testing.T) {
op := &Op{}
err := op.applyOpts([]OpOption{
WithNvidiaSMICommand("/custom/nvidia-smi"),
WithDebug(true),
})

assert.NoError(t, err)

// Check mixed default and custom values
assert.Equal(t, "/custom/nvidia-smi", op.nvidiaSMICommand)
assert.Equal(t, "nvidia-smi --query", op.nvidiaSMIQueryCommand)
assert.Equal(t, "ibstat", op.ibstatCommand)
assert.Equal(t, "/sys/class/infiniband", op.infinibandClassDirectory)
assert.True(t, op.debug)
})
}

// mockEventsStore implements events_db.Store interface for testing
type mockEventsStore struct {
events_db.Store
}
6 changes: 4 additions & 2 deletions components/accelerator/nvidia/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ func Get(ctx context.Context, opts ...OpOption) (output any, err error) {

if err := nvml.StartDefaultInstance(
ctx,
nvml.WithDBRW(op.dbRW),
nvml.WithDBRO(op.dbRO),
nvml.WithDBRW(op.dbRW), // to deprecate in favor of events store
nvml.WithDBRO(op.dbRO), // to deprecate in favor of events store
nvml.WithXidEventsStore(op.xidEventsStore),
nvml.WithHWSlowdownEventsStore(op.hwslowdownEventsStore),

Check warning on line 98 in components/accelerator/nvidia/query/query.go

View check run for this annotation

Codecov / codecov/patch

components/accelerator/nvidia/query/query.go#L95-L98

Added lines #L95 - L98 were not covered by tests
nvml.WithGPMMetricsID(
go_nvml.GPM_METRIC_SM_OCCUPANCY,
go_nvml.GPM_METRIC_INTEGER_UTIL,
Expand Down
31 changes: 28 additions & 3 deletions components/diagnose/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"runtime"
"time"

"github.com/dustin/go-humanize"
nvidia_component_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id"
nvidia_hw_slowdown_id "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/id"
nvidia_hw_slowdown_state "github.com/leptonai/gpud/components/accelerator/nvidia/hw-slowdown/state"
"github.com/leptonai/gpud/components/accelerator/nvidia/query"
nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query"
nvidia_query_nvml "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml"
nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid"
nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid"
events_db "github.com/leptonai/gpud/components/db"
"github.com/leptonai/gpud/components/dmesg"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/leptonai/gpud/pkg/process"
"github.com/leptonai/gpud/pkg/sqlite"

"github.com/dustin/go-humanize"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -125,6 +128,26 @@ func Scan(ctx context.Context, opts ...OpOption) error {
}
defer db.Close()

eventsStoreNvidiaErrorXid, err := events_db.NewStore(
db,
db,
events_db.CreateDefaultTableName(nvidia_component_error_xid_id.Name),
3*24*time.Hour,
)
if err != nil {
log.Logger.Fatalw("failed to create events store", "error", err)
}

Check warning on line 139 in components/diagnose/scan.go

View check run for this annotation

Codecov / codecov/patch

components/diagnose/scan.go#L131-L139

Added lines #L131 - L139 were not covered by tests

eventsStoreNvidiaHWSlowdown, err := events_db.NewStore(
db,
db,
events_db.CreateDefaultTableName(nvidia_hw_slowdown_id.Name),
3*24*time.Hour,
)
if err != nil {
log.Logger.Fatalw("failed to create events store", "error", err)
}

Check warning on line 149 in components/diagnose/scan.go

View check run for this annotation

Codecov / codecov/patch

components/diagnose/scan.go#L141-L149

Added lines #L141 - L149 were not covered by tests

// "nvidia_query.Get" assumes that the "clock-events-state" table exists
// pre-create since this is a one-off operation
// TODO: move these into a single place
Expand All @@ -134,8 +157,10 @@ func Scan(ctx context.Context, opts ...OpOption) error {

outputRaw, err := nvidia_query.Get(
ctx,
nvidia_query.WithDBRW(db),
nvidia_query.WithDBRO(db),
nvidia_query.WithDBRW(db), // to deprecate in favor of events store
nvidia_query.WithDBRO(db), // to deprecate in favor of events store
nvidia_query.WithXidEventsStore(eventsStoreNvidiaErrorXid),
nvidia_query.WithHWSlowdownEventsStore(eventsStoreNvidiaHWSlowdown),

Check warning on line 163 in components/diagnose/scan.go

View check run for this annotation

Codecov / codecov/patch

components/diagnose/scan.go#L160-L163

Added lines #L160 - L163 were not covered by tests
nvidia_query.WithNvidiaSMICommand(op.nvidiaSMICommand),
nvidia_query.WithNvidiaSMIQueryCommand(op.nvidiaSMIQueryCommand),
nvidia_query.WithIbstatCommand(op.ibstatCommand),
Expand Down
23 changes: 23 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
containerd_pod_id "github.com/leptonai/gpud/components/containerd/pod/id"
"github.com/leptonai/gpud/components/cpu"
cpu_id "github.com/leptonai/gpud/components/cpu/id"
events_db "github.com/leptonai/gpud/components/db"
"github.com/leptonai/gpud/components/disk"
disk_id "github.com/leptonai/gpud/components/disk/id"
"github.com/leptonai/gpud/components/dmesg"
Expand Down Expand Up @@ -196,10 +197,32 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID
if err != nil {
return nil, err
}
var eventsStoreNvidiaErrorXid events_db.Store
var eventsStoreNvidiaHWSlowdown events_db.Store

Check warning on line 201 in internal/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/server/server.go#L200-L201

Added lines #L200 - L201 were not covered by tests
if runtime.GOOS == "linux" && nvidiaInstalled {
eventsStoreNvidiaErrorXid, err = events_db.NewStore(
dbRW,
dbRO,
events_db.CreateDefaultTableName(nvidia_component_error_xid_id.Name),
3*24*time.Hour,
)
if err != nil {
return nil, err
}
eventsStoreNvidiaHWSlowdown, err = events_db.NewStore(
dbRW,
dbRO,
events_db.CreateDefaultTableName(nvidia_hw_slowdown_id.Name),
3*24*time.Hour,
)
if err != nil {
return nil, err
}

Check warning on line 220 in internal/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/server/server.go#L203-L220

Added lines #L203 - L220 were not covered by tests
nvidia_query.SetDefaultPoller(
nvidia_query.WithDBRW(dbRW), // to deprecate in favor of events store
nvidia_query.WithDBRO(dbRO), // to deprecate in favor of events store
nvidia_query.WithXidEventsStore(eventsStoreNvidiaErrorXid),
nvidia_query.WithHWSlowdownEventsStore(eventsStoreNvidiaHWSlowdown),

Check warning on line 225 in internal/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/server/server.go#L224-L225

Added lines #L224 - L225 were not covered by tests
nvidia_query.WithNvidiaSMICommand(options.NvidiaSMICommand),
nvidia_query.WithNvidiaSMIQueryCommand(options.NvidiaSMIQueryCommand),
nvidia_query.WithIbstatCommand(options.IbstatCommand),
Expand Down

0 comments on commit 82f693d

Please sign in to comment.