Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: schema watch support for mysql driver #2224

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
func TestMySQL8Datastore(t *testing.T) {
b := testdatastore.RunMySQLForTestingWithOptions(t, testdatastore.MySQLTesterOptions{MigrateForNewDatastore: true}, "")
dst := datastoreTester{b: b, t: t}
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory, test.WatchCheckpointsCategory), true)
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchCheckpointsCategory), true)
additionalMySQLTests(t, b)
}

Expand Down
29 changes: 25 additions & 4 deletions internal/datastore/mysql/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type QueryBuilder struct {
ReadNamespaceQuery sq.SelectBuilder
DeleteNamespaceQuery sq.UpdateBuilder
DeleteNamespaceRelationshipsQuery sq.UpdateBuilder
QueryChangedNamespacesQuery sq.SelectBuilder

ReadCounterQuery sq.SelectBuilder
InsertCounterQuery sq.InsertBuilder
Expand All @@ -30,10 +31,11 @@ type QueryBuilder struct {
QueryChangedQuery sq.SelectBuilder
CountRelsQuery sq.SelectBuilder

WriteCaveatQuery sq.InsertBuilder
ReadCaveatQuery sq.SelectBuilder
ListCaveatsQuery sq.SelectBuilder
DeleteCaveatQuery sq.UpdateBuilder
WriteCaveatQuery sq.InsertBuilder
ReadCaveatQuery sq.SelectBuilder
ListCaveatsQuery sq.SelectBuilder
DeleteCaveatQuery sq.UpdateBuilder
QueryChangedCaveatsQuery sq.SelectBuilder
}

// NewQueryBuilder returns a new QueryBuilder instance. The migration
Expand All @@ -49,6 +51,7 @@ func NewQueryBuilder(driver *migrations.MySQLDriver) *QueryBuilder {
builder.WriteNamespaceQuery = writeNamespace(driver.Namespace())
builder.ReadNamespaceQuery = readNamespace(driver.Namespace())
builder.DeleteNamespaceQuery = deleteNamespace(driver.Namespace())
builder.QueryChangedNamespacesQuery = changedNamespaces(driver.Namespace())

// counters builders
builder.ReadCounterQuery = readCounter(driver.RelationshipCounters())
Expand All @@ -71,6 +74,7 @@ func NewQueryBuilder(driver *migrations.MySQLDriver) *QueryBuilder {
builder.ListCaveatsQuery = listCaveats(driver.Caveat())
builder.WriteCaveatQuery = writeCaveat(driver.Caveat())
builder.DeleteCaveatQuery = deleteCaveat(driver.Caveat())
builder.QueryChangedCaveatsQuery = changedCaveats(driver.Caveat())

return &builder
}
Expand Down Expand Up @@ -223,3 +227,20 @@ func queryChanged(tableTuple string) sq.SelectBuilder {
colDeletedTxn,
).From(tableTuple)
}

func changedCaveats(tableTuple string) sq.SelectBuilder {
return sb.Select(
colName,
colCaveatDefinition,
colCreatedTxn,
colDeletedTxn,
).From(tableTuple)
}

func changedNamespaces(tableTuple string) sq.SelectBuilder {
return sb.Select(
colConfig,
colCreatedTxn,
colDeletedTxn,
).From(tableTuple)
}
160 changes: 151 additions & 9 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package mysql
import (
"context"
"errors"
"fmt"
"time"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"

sq "github.com/Masterminds/squirrel"
Expand All @@ -29,12 +31,6 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
close(updates)
errs <- errors.New("schema watch unsupported in MySQL")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MySQL")
Expand Down Expand Up @@ -177,7 +173,30 @@ func (mds *Datastore) loadChanges(
}

// Load the changes relationships for the revision range.
sql, args, err = mds.QueryChangedQuery.Where(sq.Or{
if err := mds.loadRelationshipChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}

// Load namespace changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}

// Load caveat changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}

changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
return
}

func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
Expand All @@ -191,7 +210,7 @@ func (mds *Datastore) loadChanges(
return
}

rows, err = mds.db.QueryContext(ctx, sql, args...)
rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
Expand Down Expand Up @@ -265,7 +284,130 @@ func (mds *Datastore) loadChanges(
if err = rows.Err(); err != nil {
return
}
return
}

func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}
defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte

err = rows.Scan(
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return
}
loaded := &core.NamespaceDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf("unable to parse changed namespace: %w", err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

return
}

func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}

defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte
var name string

err = rows.Scan(
&name,
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return fmt.Errorf("unable to parse changed caveat: %w", err)
}
loaded := &core.CaveatDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf(errUnableToReadConfig, err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
return
}
Loading