Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a metric tracking the selected replica #2231

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions internal/datastore/common/url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package common

import (
"errors"
"net/url"
)

// MetricsIDFromURL extracts the metrics ID from a given datastore URL.
func MetricsIDFromURL(dsURL string) (string, error) {
if dsURL == "" {
return "", errors.New("datastore URL is empty")
}

u, err := url.Parse(dsURL)
if err != nil {
return "", errors.New("could not parse datastore URL")
}
return u.Host + u.Path, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that returning host+path will lead to explosion of cardinality when having many spiced clusters in the same physical database. I do see the value in disambiguating telemetry across spicedb clusters, just something to note.

}
45 changes: 45 additions & 0 deletions internal/datastore/common/url_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package common

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMetricsIDFromURL(t *testing.T) {
tcs := []struct {
dsURL string
expected string
expectedError string
}{
{
"",
"",
"datastore URL is empty",
},
{
"postgres://user:password@localhost:5432/dbname",
"localhost:5432/dbname",
"",
},
{
"mysql://user:password@localhost:3306/dbname",
"localhost:3306/dbname",
"",
},
}

for _, tc := range tcs {
t.Run(tc.dsURL, func(t *testing.T) {
result, err := MetricsIDFromURL(tc.dsURL)
if tc.expectedError != "" {
require.Error(t, err)
require.ErrorContains(t, err, tc.expectedError)
return
}

require.NoError(t, err)
require.Equal(t, tc.expected, result)
})
}
}
4 changes: 4 additions & 0 deletions internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR

type ctxProxy struct{ delegate datastore.Datastore }

func (p *ctxProxy) MetricsID() (string, error) {
return p.delegate.MetricsID()
}

func (p *ctxProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
}
}

func (cds *crdbDatastore) MetricsID() (string, error) {
return common.MetricsIDFromURL(cds.dburl)
}

func (cds *crdbDatastore) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ type snapshot struct {
db *memdb.MemDB
}

func (mdb *memdbDatastore) MetricsID() (string, error) {
return "memdb-" + mdb.uniqueID, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this almost certainly cause explosion in cadinality for these metrics. E.g. when MemDB SpiceDB is embedded in another process, each restart will bring different uniqueID. I'd personally just have it as memdb - the scenario of multiple SpiceDB processes in memory is less likely, and the explosion in cardinality can have severe cost implications

}

func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
mdb.RLock()
defer mdb.RUnlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option
return store, nil
}

func (mds *Datastore) MetricsID() (string, error) {
return common.MetricsIDFromURL(mds.url)
}

func (mds *Datastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
createTxFunc := func(ctx context.Context) (*sql.Tx, txCleanupFunc, error) {
tx, err := mds.db.BeginTx(ctx, mds.readTxOptions)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ type pgDatastore struct {
filterMaximumIDCount uint16
}

func (pgd *pgDatastore) MetricsID() (string, error) {
return common.MetricsIDFromURL(pgd.dburl)
}

func (pgd *pgDatastore) IsStrictReadModeEnabled() bool {
return pgd.inStrictReadMode
}
Expand Down
25 changes: 22 additions & 3 deletions internal/datastore/proxy/checkingreplicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ var (
Help: "total number of readers created by the checking replica proxy",
})

checkingReplicatedReplicaReaderCount = promauto.NewCounter(prometheus.CounterOpts{
checkingReplicatedReplicaReaderCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "checking_replicated_replica_reader_total",
Help: "number of readers created by the checking replica proxy that are using the replica",
})
}, []string{"replica"})

readReplicatedSelectedReplicaCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "selected_replica_total",
Help: "the selected replica in a read replicated datastore",
}, []string{"replica"})
)

// NewCheckingReplicatedDatastore creates a new datastore that writes to the provided primary and reads
Expand Down Expand Up @@ -89,6 +96,12 @@ type checkingReplicatedDatastore struct {
// Any errors establishing the reader will be returned by subsequent calls.
func (rd *checkingReplicatedDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader {
replica := selectReplica(rd.replicas, &rd.lastReplica)
replicaID, err := replica.MetricsID()
if err != nil {
log.Warn().Err(err).Msg("unable to determine metrics ID for replica")
replicaID = "unknown"
}
readReplicatedSelectedReplicaCount.WithLabelValues(replicaID).Inc()
return &checkingStableReader{
rev: revision,
replica: replica,
Expand Down Expand Up @@ -223,7 +236,13 @@ func (rr *checkingStableReader) determineSource(ctx context.Context) error {
}
log.Trace().Str("revision", rr.rev.String()).Msg("replica contains the requested revision")

checkingReplicatedReplicaReaderCount.Inc()
metricsID, err := rr.replica.MetricsID()
if err != nil {
log.Warn().Err(err).Msg("unable to determine metrics ID for replica")
metricsID = "unknown"
}

checkingReplicatedReplicaReaderCount.WithLabelValues(metricsID).Inc()
rr.chosenReader = rr.replica.SnapshotReader(rr.rev)
rr.chosePrimaryForTest = false
})
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/checkingreplicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type fakeDatastore struct {
revision datastore.Revision
}

func (f fakeDatastore) MetricsID() (string, error) {
return "fake", nil
}

func (f fakeDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader {
return fakeSnapshotReader{
revision: revision,
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {

type observableProxy struct{ delegate datastore.Datastore }

func (p *observableProxy) MetricsID() (string, error) {
return p.delegate.MetricsID()
}

func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegateReader := p.delegate.SnapshotReader(rev)
return &observableReader{delegateReader}
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader
return args.Get(0).(datastore.Reader)
}

func (dm *MockDatastore) MetricsID() (string, error) {
return "mock", nil
}

func (dm *MockDatastore) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/relationshipintegrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func computeRelationshipHash(rel tuple.Relationship, key *hmacConfig) ([]byte, e
return hasher.Sum(nil)[:hashLength], nil
}

func (r *relationshipIntegrityProxy) MetricsID() (string, error) {
return r.ds.MetricsID()
}

func (r *relationshipIntegrityProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return relationshipIntegrityReader{
parent: r,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ type fakeDatastore struct {
lock sync.RWMutex
}

func (fds *fakeDatastore) MetricsID() (string, error) {
return "fake", nil
}

func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
fds.lock.Lock()
defer fds.lock.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type singleflightProxy struct {

var _ datastore.Datastore = (*singleflightProxy)(nil)

func (p *singleflightProxy) MetricsID() (string, error) {
return p.delegate.MetricsID()
}

func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return p.delegate.SnapshotReader(rev)
}
Expand Down
28 changes: 18 additions & 10 deletions internal/datastore/proxy/strictreplicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ var (
Help: "total number of reads made by the strict read replicated datastore",
})

strictReadReplicatedFallbackQueryCount = promauto.NewCounter(prometheus.CounterOpts{
strictReadReplicatedFallbackQueryCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "strict_replicated_fallback_query_total",
Help: "number of queries that have fallen back to the primary datastore",
})
}, []string{"replica"})
)

// NewStrictReplicatedDatastore creates a new datastore that writes to the provided primary and reads
Expand Down Expand Up @@ -77,10 +77,17 @@ type strictReplicatedDatastore struct {
// Any errors establishing the reader will be returned by subsequent calls.
func (rd *strictReplicatedDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader {
replica := selectReplica(rd.replicas, &rd.lastReplica)
replicaID, err := replica.MetricsID()
if err != nil {
log.Error().Err(err).Msg("failed to get replica metrics ID")
replicaID = "unknown"
}

return &strictReadReplicatedReader{
rev: revision,
replica: replica,
primary: rd.Datastore,
rev: revision,
replica: replica,
replicaID: replicaID,
primary: rd.Datastore,
}
}

Expand All @@ -92,9 +99,10 @@ func (rd *strictReplicatedDatastore) SnapshotReader(revision datastore.Revision)
// read mode enabled, to ensure the query will fail with a RevisionUnavailableError if the revision is
// not available.
type strictReadReplicatedReader struct {
rev datastore.Revision
replica datastore.ReadOnlyDatastore
primary datastore.Datastore
rev datastore.Revision
replica datastore.ReadOnlyDatastore
replicaID string
primary datastore.Datastore
}

func (rr *strictReadReplicatedReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
Expand Down Expand Up @@ -150,7 +158,7 @@ func queryRelationships[F any, O any](
if err != nil {
if errors.As(err, &common.RevisionUnavailableError{}) {
log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary")
strictReadReplicatedFallbackQueryCount.Inc()
strictReadReplicatedFallbackQueryCount.WithLabelValues(rr.replicaID).Inc()
return handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...)
}
return nil, err
Expand Down Expand Up @@ -187,7 +195,7 @@ func queryRelationships[F any, O any](

if requiresFallback {
log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary")
strictReadReplicatedFallbackQueryCount.Inc()
strictReadReplicatedFallbackQueryCount.WithLabelValues(rr.replicaID).Inc()
pit, err := handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...)
if err != nil {
yield(tuple.Relationship{}, err)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datas
return spannerReader{executor, txSource, sd.filterMaximumIDCount, sd.schema}
}

func (sd *spannerDatastore) MetricsID() (string, error) {
return sd.database, nil
}

func (sd *spannerDatastore) readTransactionMetadata(ctx context.Context, transactionTag string) (map[string]any, error) {
row, err := sd.client.Single().ReadRow(ctx, tableTransactionMetadata, spanner.Key{transactionTag}, []string{colMetadata})
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio

// ReadOnlyDatastore is an interface for reading relationships from the datastore.
type ReadOnlyDatastore interface {
// MetricsID returns an identifier for the datastore for use in metrics.
// This identifier is typically the hostname of the datastore (where applicable)
// and may not be unique; callers should not rely on uniqueness.
MetricsID() (string, error)

// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
// Any errors establishing the reader will be returned by subsequent calls.
SnapshotReader(Revision) Reader
Expand Down
4 changes: 4 additions & 0 deletions pkg/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,10 @@ func (f fakeDatastore) Unwrap() Datastore {
return f.delegate
}

func (f fakeDatastore) MetricsID() (string, error) {
return "fake", nil
}

func (f fakeDatastore) SnapshotReader(_ Revision) Reader {
return nil
}
Expand Down
Loading