Skip to content

Commit

Permalink
Merge pull request #2152 from josephschorr/rel-expiration-e2e
Browse files Browse the repository at this point in the history
End to end support for experimental first-class relationship expiration feature
  • Loading branch information
josephschorr authored Dec 6, 2024
2 parents 5e21207 + d749ed4 commit 4d0a80d
Show file tree
Hide file tree
Showing 78 changed files with 1,554 additions and 209 deletions.
16 changes: 16 additions & 0 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ var (
Help: "The number of stale relationships deleted by the datastore garbage collection.",
})

gcExpiredRelationshipsCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "gc_expired_relationships_total",
Help: "The number of expired relationships deleted by the datastore garbage collection.",
})

gcTransactionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Expand Down Expand Up @@ -81,6 +88,7 @@ type GarbageCollector interface {
Now(context.Context) (time.Time, error)
TxIDBefore(context.Context, time.Time) (datastore.Revision, error)
DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (DeletionCounts, error)
DeleteExpiredRels(ctx context.Context) (int64, error)
}

// DeletionCounts tracks the amount of deletions that occurred when calling
Expand Down Expand Up @@ -185,23 +193,31 @@ func RunGarbageCollection(gc GarbageCollector, window, timeout time.Duration) er

collected, err := gc.DeleteBeforeTx(ctx, watermark)

expiredRelationshipsCount, eerr := gc.DeleteExpiredRels(ctx)

// even if an error happened, garbage would have been collected. This makes sure these are reflected even if the
// worker eventually fails or times out.
gcRelationshipsCounter.Add(float64(collected.Relationships))
gcTransactionsCounter.Add(float64(collected.Transactions))
gcNamespacesCounter.Add(float64(collected.Namespaces))
gcExpiredRelationshipsCounter.Add(float64(expiredRelationshipsCount))
collectionDuration := time.Since(startTime)
gcDurationHistogram.Observe(collectionDuration.Seconds())

if err != nil {
return fmt.Errorf("error deleting in gc: %w", err)
}

if eerr != nil {
return fmt.Errorf("error deleting expired relationships in gc: %w", eerr)
}

log.Ctx(ctx).Info().
Stringer("highestTxID", watermark).
Dur("duration", collectionDuration).
Time("nowTime", now).
Interface("collected", collected).
Int64("expiredRelationships", expiredRelationshipsCount).
Msg("datastore garbage collection completed successfully")

gc.MarkGCCompleted()
Expand Down
25 changes: 22 additions & 3 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type fakeGC struct {
}

type gcMetrics struct {
deleteBeforeTxCount int
markedCompleteCount int
resetGCCompletedCount int
deleteBeforeTxCount int
markedCompleteCount int
resetGCCompletedCount int
deleteExpiredRelsCount int
}

func newFakeGC(deleter gcDeleter) fakeGC {
Expand Down Expand Up @@ -71,6 +72,15 @@ func (gc *fakeGC) DeleteBeforeTx(_ context.Context, rev datastore.Revision) (Del
return gc.deleter.DeleteBeforeTx(revInt)
}

func (gc *fakeGC) DeleteExpiredRels(_ context.Context) (int64, error) {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.metrics.deleteExpiredRelsCount++

return gc.deleter.DeleteExpiredRels()
}

func (gc *fakeGC) HasGCRun() bool {
gc.lock.Lock()
defer gc.lock.Unlock()
Expand Down Expand Up @@ -102,6 +112,7 @@ func (gc *fakeGC) GetMetrics() gcMetrics {
// Allows specifying different deletion behaviors for tests
type gcDeleter interface {
DeleteBeforeTx(revision uint64) (DeletionCounts, error)
DeleteExpiredRels() (int64, error)
}

// Always error trying to perform a delete
Expand All @@ -111,6 +122,10 @@ func (alwaysErrorDeleter) DeleteBeforeTx(_ uint64) (DeletionCounts, error) {
return DeletionCounts{}, fmt.Errorf("delete error")
}

func (alwaysErrorDeleter) DeleteExpiredRels() (int64, error) {
return 0, fmt.Errorf("delete error")
}

// Only error on specific revisions
type revisionErrorDeleter struct {
errorOnRevisions []uint64
Expand All @@ -124,6 +139,10 @@ func (d revisionErrorDeleter) DeleteBeforeTx(revision uint64) (DeletionCounts, e
return DeletionCounts{}, nil
}

func (d revisionErrorDeleter) DeleteExpiredRels() (int64, error) {
return 0, nil
}

func TestGCFailureBackoff(t *testing.T) {
localCounter := prometheus.NewCounter(gcFailureCounterConfig)
reg := prometheus.NewRegistry()
Expand Down
20 changes: 20 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ type SchemaInformation struct {
colUsersetObjectID string
colUsersetRelation string
colCaveatName string
colExpiration string
paginationFilterType PaginationFilterType
nowFunction string
}

func NewSchemaInformation(
Expand All @@ -82,7 +84,9 @@ func NewSchemaInformation(
colUsersetObjectID,
colUsersetRelation,
colCaveatName string,
colExpiration string,
paginationFilterType PaginationFilterType,
nowFunction string,
) SchemaInformation {
return SchemaInformation{
colNamespace,
Expand All @@ -92,7 +96,9 @@ func NewSchemaInformation(
colUsersetObjectID,
colUsersetRelation,
colCaveatName,
colExpiration,
paginationFilterType,
nowFunction,
}
}

Expand All @@ -112,6 +118,12 @@ func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuil
log.Warn().Msg("SchemaQueryFilterer: filterMaximumIDCount not set, defaulting to 100")
}

// Filter out any expired relationships.
initialQuery = initialQuery.Where(sq.Or{
sq.Eq{schema.colExpiration: nil},
sq.Expr(schema.colExpiration + " > " + schema.nowFunction + "()"),
})

return SchemaQueryFilterer{
schema: schema,
queryBuilder: initialQuery,
Expand Down Expand Up @@ -404,6 +416,12 @@ func (sqf SchemaQueryFilterer) FilterWithRelationshipsFilter(filter datastore.Re
csqf = csqf.FilterWithCaveatName(filter.OptionalCaveatName)
}

if filter.OptionalExpirationOption == datastore.ExpirationFilterOptionHasExpiration {
csqf.queryBuilder = csqf.queryBuilder.Where(sq.NotEq{csqf.schema.colExpiration: nil})
} else if filter.OptionalExpirationOption == datastore.ExpirationFilterOptionNoExpiration {
csqf.queryBuilder = csqf.queryBuilder.Where(sq.Eq{csqf.schema.colExpiration: nil})
}

return csqf, nil
}

Expand Down Expand Up @@ -563,6 +581,8 @@ func (tqs QueryExecutor) ExecuteQuery(
}

toExecute := query.limit(limit)

// Run the query.
sql, args, err := toExecute.queryBuilder.ToSql()
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 4d0a80d

Please sign in to comment.