Skip to content

Commit

Permalink
Address further feedback:
Browse files Browse the repository at this point in the history
- Have QueryRelationships add events to the parent span directly
- Cleanup CRDB system time handling and add debug-time assertions for "as of system time"
- Additional testing
  • Loading branch information
josephschorr committed Jan 8, 2025
1 parent 52991f7 commit f71404b
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 94 deletions.
5 changes: 2 additions & 3 deletions internal/datastore/common/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ type closeRows interface {
}

// QueryRelationships queries relationships for the given query and transaction.
func QueryRelationships[R Rows, C ~map[string]any](ctx context.Context, builder RelationshipsQueryBuilder, span trace.Span, tx Querier[R]) (datastore.RelationshipIterator, error) {
defer span.End()

func QueryRelationships[R Rows, C ~map[string]any](ctx context.Context, builder RelationshipsQueryBuilder, tx Querier[R]) (datastore.RelationshipIterator, error) {
span := trace.SpanFromContext(ctx)
sqlString, args, err := builder.SelectSQL()
if err != nil {
return nil, fmt.Errorf(errUnableToQueryRels, err)
Expand Down
13 changes: 12 additions & 1 deletion internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ func (exc QueryRelationshipsExecutor) ExecuteQuery(
Schema: query.schema,
SkipCaveats: queryOpts.SkipCaveats,
SkipExpiration: queryOpts.SkipExpiration,
sqlAssertion: queryOpts.SQLAssertion,
filteringValues: query.filteringColumnTracker,
baseQueryBuilder: query,
}
Expand All @@ -682,6 +683,7 @@ type RelationshipsQueryBuilder struct {

filteringValues columnTrackerMap
baseQueryBuilder SchemaQueryFilterer
sqlAssertion options.Assertion
}

// withCaveats returns true if caveats should be included in the query.
Expand Down Expand Up @@ -745,7 +747,16 @@ func (b RelationshipsQueryBuilder) SelectSQL() (string, []any, error) {
sqlBuilder := b.baseQueryBuilder.queryBuilderWithMaybeExpirationFilter(b.SkipExpiration)
sqlBuilder = sqlBuilder.Columns(columnNamesToSelect...)

return sqlBuilder.ToSql()
sql, args, err := sqlBuilder.ToSql()
if err != nil {
return "", nil, err
}

if b.sqlAssertion != nil {
b.sqlAssertion(sql)
}

return sql, args, nil
}

// FilteringValuesForTesting returns the filtering values. For test use only.
Expand Down
6 changes: 4 additions & 2 deletions internal/datastore/crdb/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ const (
)

func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
query := cr.fromWithAsOfSystemTime(readCaveat.Where(sq.Eq{colCaveatName: name}), tableCaveat)
query := cr.addFromToQuery(readCaveat.Where(sq.Eq{colCaveatName: name}), tableCaveat)
sql, args, err := query.ToSql()
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err)
}
cr.assertHasExpectedAsOfSystemTime(sql)

var definitionBytes []byte
var timestamp time.Time
Expand Down Expand Up @@ -79,7 +80,7 @@ type bytesAndTimestamp struct {
}

func (cr *crdbReader) lookupCaveats(ctx context.Context, caveatNames []string) ([]datastore.RevisionedCaveat, error) {
caveatsWithNames := cr.fromWithAsOfSystemTime(listCaveat, tableCaveat)
caveatsWithNames := cr.addFromToQuery(listCaveat, tableCaveat)
if len(caveatNames) > 0 {
caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames})
}
Expand All @@ -88,6 +89,7 @@ func (cr *crdbReader) lookupCaveats(ctx context.Context, caveatNames []string) (
if err != nil {
return nil, fmt.Errorf(errListCaveats, err)
}
cr.assertHasExpectedAsOfSystemTime(sql)

var allDefinitionBytes []bytesAndTimestamp

Expand Down
40 changes: 21 additions & 19 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,16 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
executor := common.QueryRelationshipsExecutor{
Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(cds.readPool),
}

withAsOfSystemTime := func(query sq.SelectBuilder, tableName string) sq.SelectBuilder {
return query.From(tableName + " AS OF SYSTEM TIME " + rev.String())
return &crdbReader{
schema: cds.schema,
query: cds.readPool,
executor: executor,
keyer: noOverlapKeyer,
overlapKeySet: nil,
filterMaximumIDCount: cds.filterMaximumIDCount,
withIntegrity: cds.supportsIntegrity,
atSpecificRevision: rev.String(),
}

asOfSystemTimeSuffix := "AS OF SYSTEM TIME " + rev.String()
return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, withAsOfSystemTime, asOfSystemTimeSuffix, cds.filterMaximumIDCount, cds.schema, cds.supportsIntegrity}
}

func (cds *crdbDatastore) ReadWriteTx(
Expand Down Expand Up @@ -399,20 +402,19 @@ func (cds *crdbDatastore) ReadWriteTx(
return fmt.Errorf("error writing metadata: %w", err)
}

reader := &crdbReader{
schema: cds.schema,
query: querier,
executor: executor,
keyer: cds.writeOverlapKeyer,
overlapKeySet: cds.overlapKeyInit(ctx),
filterMaximumIDCount: cds.filterMaximumIDCount,
withIntegrity: cds.supportsIntegrity,
atSpecificRevision: "", // No AS OF SYSTEM TIME for writes
}

rwt := &crdbReadWriteTXN{
&crdbReader{
querier,
executor,
cds.writeOverlapKeyer,
cds.overlapKeyInit(ctx),
func(query sq.SelectBuilder, tableName string) sq.SelectBuilder {
return query.From(tableName)
},
"", // No AS OF SYSTEM TIME for writes
cds.filterMaximumIDCount,
cds.schema,
cds.supportsIntegrity,
},
reader,
tx,
0,
}
Expand Down
94 changes: 70 additions & 24 deletions internal/datastore/crdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

sq "github.com/Masterminds/squirrel"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
)

const (
Expand All @@ -38,15 +40,42 @@ var (
)

type crdbReader struct {
query pgxcommon.DBFuncQuerier
executor common.QueryRelationshipsExecutor
keyer overlapKeyer
overlapKeySet keySet
fromWithAsOfSystemTime func(query sq.SelectBuilder, tableName string) sq.SelectBuilder
asOfSystemTimeSuffix string
filterMaximumIDCount uint16
schema common.SchemaInformation
withIntegrity bool
schema common.SchemaInformation
query pgxcommon.DBFuncQuerier
executor common.QueryRelationshipsExecutor
keyer overlapKeyer
overlapKeySet keySet
filterMaximumIDCount uint16
withIntegrity bool
atSpecificRevision string
}

const asOfSystemTime = "AS OF SYSTEM TIME"

func (cr *crdbReader) addFromToQuery(query sq.SelectBuilder, tableName string) sq.SelectBuilder {
if cr.atSpecificRevision == "" {
return query.From(tableName)
}

return query.From(tableName + " " + asOfSystemTime + " " + cr.atSpecificRevision)
}

func (cr *crdbReader) fromSuffix() string {
if cr.atSpecificRevision == "" {
return ""
}

return " " + asOfSystemTime + " " + cr.atSpecificRevision
}

func (cr *crdbReader) assertHasExpectedAsOfSystemTime(sql string) {
spiceerrors.DebugAssert(func() bool {
if cr.atSpecificRevision == "" {
return !strings.Contains(sql, "AS OF SYSTEM TIME")
} else {
return strings.Contains(sql, "AS OF SYSTEM TIME")
}
}, "mismatch in AS OF SYSTEM TIME in query: %s", sql)
}

func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int, error) {
Expand All @@ -64,7 +93,7 @@ func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int,
return 0, err
}

query := cr.fromWithAsOfSystemTime(countRels, cr.schema.RelationshipTableName)
query := cr.addFromToQuery(countRels, cr.schema.RelationshipTableName)
builder, err := common.NewSchemaQueryFiltererWithStartingQuery(cr.schema, query, cr.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter)
if err != nil {
return 0, err
Expand All @@ -74,6 +103,7 @@ func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int,
if err != nil {
return 0, err
}
cr.assertHasExpectedAsOfSystemTime(sql)

var count int
err = cr.query.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
Expand All @@ -93,7 +123,7 @@ func (cr *crdbReader) LookupCounters(ctx context.Context) ([]datastore.Relations
}

func (cr *crdbReader) lookupCounters(ctx context.Context, optionalFilterName string) ([]datastore.RelationshipCounter, error) {
query := cr.fromWithAsOfSystemTime(queryCounters, tableRelationshipCounter)
query := cr.addFromToQuery(queryCounters, tableRelationshipCounter)
if optionalFilterName != noFilterOnCounterName {
query = query.Where(sq.Eq{colCounterName: optionalFilterName})
}
Expand All @@ -102,6 +132,7 @@ func (cr *crdbReader) lookupCounters(ctx context.Context, optionalFilterName str
if err != nil {
return nil, err
}
cr.assertHasExpectedAsOfSystemTime(sql)

var counters []datastore.RelationshipCounter
err = cr.query.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error {
Expand Down Expand Up @@ -165,10 +196,11 @@ func (cr *crdbReader) ReadNamespaceByName(
}

func (cr *crdbReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) {
nsDefs, err := loadAllNamespaces(ctx, cr.query, cr.fromWithAsOfSystemTime)
nsDefs, sql, err := loadAllNamespaces(ctx, cr.query, cr.addFromToQuery)
if err != nil {
return nil, fmt.Errorf(errUnableToListNamespaces, err)
}
cr.assertHasExpectedAsOfSystemTime(sql)
return nsDefs, nil
}

Expand All @@ -188,11 +220,15 @@ func (cr *crdbReader) QueryRelationships(
filter datastore.RelationshipsFilter,
opts ...options.QueryOptionsOption,
) (iter datastore.RelationshipIterator, err error) {
qBuilder, err := common.NewSchemaQueryFiltererForRelationshipsSelect(cr.schema, cr.filterMaximumIDCount).WithFromSuffix(cr.asOfSystemTimeSuffix).FilterWithRelationshipsFilter(filter)
qBuilder, err := common.NewSchemaQueryFiltererForRelationshipsSelect(cr.schema, cr.filterMaximumIDCount).WithFromSuffix(cr.fromSuffix()).FilterWithRelationshipsFilter(filter)
if err != nil {
return nil, err
}

if spiceerrors.DebugAssertionsEnabled {
opts = append(opts, options.WithSQLAssertion(cr.assertHasExpectedAsOfSystemTime))
}

return cr.executor.ExecuteQuery(ctx, qBuilder, opts...)
}

Expand All @@ -202,34 +238,43 @@ func (cr *crdbReader) ReverseQueryRelationships(
opts ...options.ReverseQueryOptionsOption,
) (iter datastore.RelationshipIterator, err error) {
qBuilder, err := common.NewSchemaQueryFiltererForRelationshipsSelect(cr.schema, cr.filterMaximumIDCount).
WithFromSuffix(cr.asOfSystemTimeSuffix).
WithFromSuffix(cr.fromSuffix()).
FilterWithSubjectsSelectors(subjectsFilter.AsSelector())
if err != nil {
return nil, err
}

queryOpts := options.NewReverseQueryOptionsWithOptions(opts...)

if queryOpts.ResRelation != nil {
qBuilder = qBuilder.
FilterToResourceType(queryOpts.ResRelation.Namespace).
FilterToRelation(queryOpts.ResRelation.Relation)
}

eopts := []options.QueryOptionsOption{
options.WithLimit(queryOpts.LimitForReverse),
options.WithAfter(queryOpts.AfterForReverse),
options.WithSort(queryOpts.SortForReverse),
}

if spiceerrors.DebugAssertionsEnabled {
eopts = append(eopts, options.WithSQLAssertion(cr.assertHasExpectedAsOfSystemTime))
}

return cr.executor.ExecuteQuery(
ctx,
qBuilder,
options.WithLimit(queryOpts.LimitForReverse),
options.WithAfter(queryOpts.AfterForReverse),
options.WithSort(queryOpts.SortForReverse))
eopts...,
)
}

func (cr crdbReader) loadNamespace(ctx context.Context, tx pgxcommon.DBFuncQuerier, nsName string) (*core.NamespaceDefinition, time.Time, error) {
query := cr.fromWithAsOfSystemTime(queryReadNamespace, tableNamespace).Where(sq.Eq{colNamespace: nsName})
query := cr.addFromToQuery(queryReadNamespace, tableNamespace).Where(sq.Eq{colNamespace: nsName})
sql, args, err := query.ToSql()
if err != nil {
return nil, time.Time{}, err
}
cr.assertHasExpectedAsOfSystemTime(sql)

var config []byte
var timestamp time.Time
Expand Down Expand Up @@ -258,11 +303,12 @@ func (cr crdbReader) lookupNamespaces(ctx context.Context, tx pgxcommon.DBFuncQu
clause = append(clause, sq.Eq{colNamespace: nsName})
}

query := cr.fromWithAsOfSystemTime(queryReadNamespace, tableNamespace).Where(clause)
query := cr.addFromToQuery(queryReadNamespace, tableNamespace).Where(clause)
sql, args, err := query.ToSql()
if err != nil {
return nil, err
}
cr.assertHasExpectedAsOfSystemTime(sql)

var nsDefs []datastore.RevisionedNamespace

Expand Down Expand Up @@ -297,11 +343,11 @@ func (cr crdbReader) lookupNamespaces(ctx context.Context, tx pgxcommon.DBFuncQu
return nsDefs, nil
}

func loadAllNamespaces(ctx context.Context, tx pgxcommon.DBFuncQuerier, fromBuilder func(sq.SelectBuilder, string) sq.SelectBuilder) ([]datastore.RevisionedNamespace, error) {
func loadAllNamespaces(ctx context.Context, tx pgxcommon.DBFuncQuerier, fromBuilder func(sq.SelectBuilder, string) sq.SelectBuilder) ([]datastore.RevisionedNamespace, string, error) {
query := fromBuilder(queryReadNamespace, tableNamespace)
sql, args, err := query.ToSql()
if err != nil {
return nil, err
return nil, sql, err
}

var nsDefs []datastore.RevisionedNamespace
Expand Down Expand Up @@ -331,10 +377,10 @@ func loadAllNamespaces(ctx context.Context, tx pgxcommon.DBFuncQuerier, fromBuil
return nil
}, sql, args...)
if err != nil {
return nil, err
return nil, sql, err
}

return nsDefs, nil
return nsDefs, sql, nil
}

func (cr *crdbReader) addOverlapKey(namespace string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
if err != nil {
return fmt.Errorf("unable to read namespaces: %w", err)
}
nsDefs, err = loadAllNamespaces(ctx, pgxcommon.QuerierFuncsFor(tx), func(sb squirrel.SelectBuilder, tableName string) squirrel.SelectBuilder {
nsDefs, _, err = loadAllNamespaces(ctx, pgxcommon.QuerierFuncsFor(tx), func(sb squirrel.SelectBuilder, tableName string) squirrel.SelectBuilder {
return sb.From(tableName)
})
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

datastoreinternal "github.com/authzed/spicedb/internal/datastore"
Expand Down Expand Up @@ -446,8 +445,8 @@ func (aqt asQueryableTx) QueryFunc(ctx context.Context, f func(context.Context,
return err
}

if rows.Err() != nil {
return rows.Err()
if err := rows.Err(); err != nil {
return err
}

return f(ctx, rows)
Expand All @@ -470,8 +469,7 @@ func newMySQLExecutor(tx querier) common.ExecuteReadRelsQueryFunc {
// Prepared statements are also not used given they perform poorly on environments where connections have
// short lifetime (e.g. to gracefully handle load-balancer connection drain)
return func(ctx context.Context, builder common.RelationshipsQueryBuilder) (datastore.RelationshipIterator, error) {
span := trace.SpanFromContext(ctx)
return common.QueryRelationships[common.Rows, structpbWrapper](ctx, builder, span, asQueryableTx{tx})
return common.QueryRelationships[common.Rows, structpbWrapper](ctx, builder, asQueryableTx{tx})
}
}

Expand Down
Loading

0 comments on commit f71404b

Please sign in to comment.