From a08027bf9ea70d90d7e8188d9a9612bc376143cf Mon Sep 17 00:00:00 2001 From: Braden Groom Date: Tue, 17 Oct 2023 21:43:08 -0500 Subject: [PATCH 1/2] Run postgres datastore tests with pgbouncer --- internal/datastore/postgres/postgres_test.go | 49 +++-- internal/testserver/datastore/datastore.go | 2 +- internal/testserver/datastore/postgres.go | 199 +++++++++++++++---- 3 files changed, 188 insertions(+), 62 deletions(-) diff --git a/internal/datastore/postgres/postgres_test.go b/internal/datastore/postgres/postgres_test.go index a80f9f3072..52b4417542 100644 --- a/internal/datastore/postgres/postgres_test.go +++ b/internal/datastore/postgres/postgres_test.go @@ -15,6 +15,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/samber/lo" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/trace" @@ -43,10 +44,25 @@ func (pgd *pgDatastore) ExampleRetryableError() error { } } +type postgresConfig struct { + targetMigration string + migrationPhase string + pgVersion string + pgbouncer bool +} + // the global OTel tracer is used everywhere, so we synchronize tests over a global test tracer var ( otelMutex = sync.Mutex{} testTraceProvider *trace.TracerProvider + postgresConfigs = lo.FlatMap( + []string{pgversion.MinimumSupportedPostgresVersion, "14", "15", "16"}, + func(postgresVersion string, _ int) []postgresConfig { + return lo.Map([]bool{false, true}, func(enablePgbouncer bool, _ int) postgresConfig { + return postgresConfig{"head", "", postgresVersion, enablePgbouncer} + }) + }, + ) ) func init() { @@ -59,20 +75,14 @@ func init() { func TestPostgresDatastore(t *testing.T) { t.Parallel() - for _, config := range []struct { - targetMigration string - migrationPhase string - pgVersion string - }{ - {"head", "", pgversion.MinimumSupportedPostgresVersion}, - {"head", "", "14"}, - {"head", "", "15"}, - {"head", "", "16"}, - } { - config := config - t.Run(fmt.Sprintf("postgres-%s-%s-%s", config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { + for _, config := range postgresConfigs { + pgbouncerStr := "" + if config.pgbouncer { + pgbouncerStr = "pgbouncer-" + } + t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { t.Parallel() - b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion) + b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer) test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { @@ -191,12 +201,13 @@ func TestPostgresDatastore(t *testing.T) { func TestPostgresDatastoreWithoutCommitTimestamps(t *testing.T) { t.Parallel() - for _, pgVersion := range []string{pgversion.MinimumSupportedPostgresVersion, "14", "15", "16"} { - pgVersion := pgVersion + for _, config := range postgresConfigs { + pgVersion := config.pgVersion + enablePgbouncer := config.pgbouncer t.Run(fmt.Sprintf("postgres-%s", pgVersion), func(t *testing.T) { t.Parallel() - b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion) + b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer) // NOTE: watch API requires the commit timestamps, so we skip those tests here. test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { @@ -1130,7 +1141,7 @@ func OTelTracingTest(t *testing.T, ds datastore.Datastore) { func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgVersion string) { require := require.New(t) - ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion).NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion, false).NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(uri, RevisionQuantization(0), GCWindow(time.Millisecond*1), @@ -1154,7 +1165,7 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV func BenchmarkPostgresQuery(b *testing.B) { req := require.New(b) - ds := testdatastore.RunPostgresForTesting(b, "", migrate.Head, pgversion.MinimumSupportedPostgresVersion).NewDatastore(b, func(engine, uri string) datastore.Datastore { + ds := testdatastore.RunPostgresForTesting(b, "", migrate.Head, pgversion.MinimumSupportedPostgresVersion, false).NewDatastore(b, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(uri, RevisionQuantization(0), GCWindow(time.Millisecond*1), @@ -1188,7 +1199,7 @@ func BenchmarkPostgresQuery(b *testing.B) { func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.QueryInterceptor, pgVersion string) datastore.Datastore { require := require.New(t) - ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion).NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion, false).NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(uri, RevisionQuantization(0), GCWindow(time.Millisecond*1), diff --git a/internal/testserver/datastore/datastore.go b/internal/testserver/datastore/datastore.go index 856e611179..3f9a65cba9 100644 --- a/internal/testserver/datastore/datastore.go +++ b/internal/testserver/datastore/datastore.go @@ -63,7 +63,7 @@ func RunDatastoreEngineWithBridge(t testing.TB, engine string, bridgeNetworkName case "cockroachdb": return RunCRDBForTesting(t, bridgeNetworkName) case "postgres": - return RunPostgresForTesting(t, bridgeNetworkName, migrate.Head, version.MinimumSupportedPostgresVersion) + return RunPostgresForTesting(t, bridgeNetworkName, migrate.Head, version.MinimumSupportedPostgresVersion, false) case "mysql": return RunMySQLForTesting(t, bridgeNetworkName) case "spanner": diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index 64001c70a8..2257413c75 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/require" pgmigrations "github.com/authzed/spicedb/internal/datastore/postgres/migrations" @@ -19,69 +20,91 @@ import ( "github.com/authzed/spicedb/pkg/secrets" ) +const ( + POSTGRES_TEST_USER = "postgres" + POSTGRES_TEST_PASSWORD = "secret" + POSTGRES_TEST_PORT = "5432" + POSTGRES_TEST_MAX_CONNECTIONS = "500" + PGBOUNCER_TEST_PORT = "6432" +) + +type container struct { + hostHostname string + hostPort string + containerHostname string + containerPort string +} + type postgresTester struct { - conn *pgx.Conn - hostname string - port string - creds string - targetMigration string + container + hostConn *pgx.Conn + creds string + targetMigration string + pgbouncerProxy *container + useContainerHostname bool } // RunPostgresForTesting returns a RunningEngineForTest for postgres -func RunPostgresForTesting(t testing.TB, bridgeNetworkName string, targetMigration string, pgVersion string) RunningEngineForTest { - return RunPostgresForTestingWithCommitTimestamps(t, bridgeNetworkName, targetMigration, true, pgVersion) +func RunPostgresForTesting(t testing.TB, bridgeNetworkName string, targetMigration string, pgVersion string, enablePgbouncer bool) RunningEngineForTest { + return RunPostgresForTestingWithCommitTimestamps(t, bridgeNetworkName, targetMigration, true, pgVersion, enablePgbouncer) } -func RunPostgresForTestingWithCommitTimestamps(t testing.TB, bridgeNetworkName string, targetMigration string, withCommitTimestamps bool, pgVersion string) RunningEngineForTest { +func RunPostgresForTestingWithCommitTimestamps(t testing.TB, bridgeNetworkName string, targetMigration string, withCommitTimestamps bool, pgVersion string, enablePgbouncer bool) RunningEngineForTest { pool, err := dockertest.NewPool("") require.NoError(t, err) - name := fmt.Sprintf("postgres-%s", uuid.New().String()) + bridgeSupplied := bridgeNetworkName != "" + if enablePgbouncer && !bridgeSupplied { + // We will need a network bridge if we're running pgbouncer + bridgeNetworkName = createNetworkBridge(t, pool) + } + + postgresContainerHostname := fmt.Sprintf("postgres-%s", uuid.New().String()) - cmd := []string{"-N", "500"} // Max Connections + cmd := []string{"-N", POSTGRES_TEST_MAX_CONNECTIONS} if withCommitTimestamps { cmd = append(cmd, "-c", "track_commit_timestamp=1") } - resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Name: name, - Repository: "postgres", - Tag: pgVersion, - Env: []string{"POSTGRES_PASSWORD=secret", "POSTGRES_DB=defaultdb"}, - ExposedPorts: []string{"5432/tcp"}, + postgres, err := pool.RunWithOptions(&dockertest.RunOptions{ + Name: postgresContainerHostname, + Repository: "postgres", + Tag: pgVersion, + Env: []string{ + "POSTGRES_USER=" + POSTGRES_TEST_USER, + "POSTGRES_PASSWORD=" + POSTGRES_TEST_PASSWORD, + // use md5 auth to align postgres and pgbouncer auth methods + "POSTGRES_HOST_AUTH_METHOD=md5", + "POSTGRES_INITDB_ARGS=--auth=md5", + }, + ExposedPorts: []string{POSTGRES_TEST_PORT + "/tcp"}, NetworkID: bridgeNetworkName, Cmd: cmd, }) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, pool.Purge(postgres)) + }) builder := &postgresTester{ - hostname: "localhost", - creds: "postgres:secret", - targetMigration: targetMigration, + container: container{ + hostHostname: "localhost", + hostPort: postgres.GetPort(POSTGRES_TEST_PORT + "/tcp"), + containerHostname: postgresContainerHostname, + containerPort: POSTGRES_TEST_PORT, + }, + creds: POSTGRES_TEST_USER + ":" + POSTGRES_TEST_PASSWORD, + targetMigration: targetMigration, + useContainerHostname: bridgeSupplied, } - t.Cleanup(func() { - require.NoError(t, pool.Purge(resource)) - }) - port := resource.GetPort(fmt.Sprintf("%d/tcp", 5432)) - if bridgeNetworkName != "" { - builder.hostname = name - builder.port = "5432" - } else { - builder.port = port + if enablePgbouncer { + // if we are running with pgbouncer enabled then set it up + builder.runPgbouncerForTesting(t, pool, bridgeNetworkName) } - uri := fmt.Sprintf("postgres://%s@localhost:%s/defaultdb?sslmode=disable", builder.creds, port) - require.NoError(t, pool.Retry(func() error { - var err error - ctx, cancelConnect := context.WithTimeout(context.Background(), dockerBootTimeout) - defer cancelConnect() - builder.conn, err = pgx.Connect(ctx, uri) - if err != nil { - return err - } - return nil - })) + builder.hostConn = builder.initializeHostConnection(t, pool) + return builder } @@ -91,14 +114,15 @@ func (b *postgresTester) NewDatabase(t testing.TB) string { newDBName := "db" + uniquePortion - _, err = b.conn.Exec(context.Background(), "CREATE DATABASE "+newDBName) + _, err = b.hostConn.Exec(context.Background(), "CREATE DATABASE "+newDBName) require.NoError(t, err) + hostname, port := b.getHostnameAndPort() return fmt.Sprintf( "postgres://%s@%s:%s/%s?sslmode=disable", b.creds, - b.hostname, - b.port, + hostname, + port, newDBName, ) } @@ -113,3 +137,94 @@ func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore return initFunc("postgres", connectStr) } + +func createNetworkBridge(t testing.TB, pool *dockertest.Pool) string { + bridgeNetworkName := fmt.Sprintf("bridge-%s", uuid.New().String()) + network, err := pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: bridgeNetworkName}) + + require.NoError(t, err) + t.Cleanup(func() { + pool.Client.RemoveNetwork(network.ID) + }) + + return bridgeNetworkName +} + +func (b *postgresTester) runPgbouncerForTesting(t testing.TB, pool *dockertest.Pool, bridgeNetworkName string) { + uniqueID := uuid.New().String() + pgbouncerContainerHostname := fmt.Sprintf("pgbouncer-%s", uniqueID) + + pgbouncer, err := pool.RunWithOptions(&dockertest.RunOptions{ + Name: pgbouncerContainerHostname, + Repository: "edoburu/pgbouncer", + Tag: "latest", + Env: []string{ + "DB_USER=" + POSTGRES_TEST_USER, + "DB_PASSWORD=" + POSTGRES_TEST_PASSWORD, + "DB_HOST=" + b.containerHostname, + "DB_PORT=" + b.containerPort, + "LISTEN_PORT=" + PGBOUNCER_TEST_PORT, + "DB_NAME=*", // Needed to make pgbouncer okay with the randomly named databases generated by the test suite + "AUTH_TYPE=md5", // use the same auth type as postgres + "MAX_CLIENT_CONN=" + POSTGRES_TEST_MAX_CONNECTIONS, + // params needed for spicedb + "POOL_MODE=session", // https://github.com/authzed/spicedb/issues/1217 + "IGNORE_STARTUP_PARAMETERS=plan_cache_mode", // Tell pgbouncer to pass this param thru to postgres. + }, + ExposedPorts: []string{PGBOUNCER_TEST_PORT + "/tcp"}, + NetworkID: bridgeNetworkName, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, pool.Purge(pgbouncer)) + }) + + b.pgbouncerProxy = &container{ + hostHostname: "localhost", + hostPort: pgbouncer.GetPort(PGBOUNCER_TEST_PORT + "/tcp"), + containerHostname: pgbouncerContainerHostname, + containerPort: PGBOUNCER_TEST_PORT, + } +} + +func (b *postgresTester) initializeHostConnection(t testing.TB, pool *dockertest.Pool) (conn *pgx.Conn) { + hostname, port := b.getHostHostnameAndPort() + uri := fmt.Sprintf("postgresql://%s@%s:%s/?sslmode=disable", b.creds, hostname, port) + err := pool.Retry(func() error { + var err error + ctx, cancelConnect := context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancelConnect() + conn, err = pgx.Connect(ctx, uri) + if err != nil { + return err + } + return nil + }) + require.NoError(t, err) + return conn +} + +func (b *postgresTester) getHostnameAndPort() (string, string) { + // If a bridgeNetworkName is supplied then we will return the container + // hostname and port that is resolvable from within the container network. + // If bridgeNetworkName is not supplied then the hostname and port will be + // resolvable from the host. + if b.useContainerHostname { + return b.getContainerHostnameAndPort() + } + return b.getHostHostnameAndPort() +} + +func (b *postgresTester) getHostHostnameAndPort() (string, string) { + if b.pgbouncerProxy != nil { + return b.pgbouncerProxy.hostHostname, b.pgbouncerProxy.hostPort + } + return b.hostHostname, b.hostPort +} + +func (b *postgresTester) getContainerHostnameAndPort() (string, string) { + if b.pgbouncerProxy != nil { + return b.pgbouncerProxy.containerHostname, b.pgbouncerProxy.containerPort + } + return b.containerHostname, b.containerPort +} From 8d1dc41f736a9a5eacb011de9641924b60c85a48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Fri, 27 Oct 2023 10:44:31 +0100 Subject: [PATCH 2/2] implement postgres+pgbouncer as a separate test This is done solely so that CI builds don't take too long. As a solution this is run as a separate datastore test leveraging build tags consistency tests are left out of scope for now --- .github/workflows/build-test.yaml | 2 +- internal/datastore/postgres/pgbouncer_test.go | 31 + .../postgres/postgres_shared_test.go | 1379 +++++++++++++++++ internal/datastore/postgres/postgres_test.go | 1374 +--------------- magefiles/test.go | 21 +- 5 files changed, 1433 insertions(+), 1374 deletions(-) create mode 100644 internal/datastore/postgres/pgbouncer_test.go create mode 100644 internal/datastore/postgres/postgres_shared_test.go diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 558910edf9..4e95504a2d 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -94,7 +94,7 @@ jobs: strategy: fail-fast: false matrix: - datastore: ["crdb", "mysql", "postgres", "spanner"] + datastore: ["crdb", "mysql", "postgres", "spanner", "pgbouncer"] steps: - uses: "actions/checkout@v3" - uses: "authzed/actions/setup-go@main" diff --git a/internal/datastore/postgres/pgbouncer_test.go b/internal/datastore/postgres/pgbouncer_test.go new file mode 100644 index 0000000000..cf736f2f61 --- /dev/null +++ b/internal/datastore/postgres/pgbouncer_test.go @@ -0,0 +1,31 @@ +//go:build ci && docker && pgbouncer +// +build ci,docker,pgbouncer + +package postgres + +import ( + "testing" + + pgversion "github.com/authzed/spicedb/internal/datastore/postgres/version" + + "github.com/samber/lo" +) + +var pgbouncerConfigs = lo.Map( + []string{pgversion.MinimumSupportedPostgresVersion, "14", "15", "16"}, + func(postgresVersion string, _ int) postgresConfig { + return postgresConfig{"head", "", postgresVersion, true} + }, +) + +func TestPostgresWithPgBouncerDatastore(t *testing.T) { + t.Parallel() + + testPostgresDatastore(t, pgbouncerConfigs) +} + +func TestPostgresDatastoreWithPgBouncerWithoutCommitTimestamps(t *testing.T) { + t.Parallel() + + testPostgresDatastoreWithoutCommitTimestamps(t, pgbouncerConfigs) +} diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go new file mode 100644 index 0000000000..ab7208ef89 --- /dev/null +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -0,0 +1,1379 @@ +//go:build ci && docker +// +build ci,docker + +package postgres + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync" + "testing" + "time" + + sq "github.com/Masterminds/squirrel" + "github.com/authzed/spicedb/internal/datastore/common" + pgcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + pgversion "github.com/authzed/spicedb/internal/datastore/postgres/version" + "github.com/authzed/spicedb/internal/testfixtures" + testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/test" + "github.com/authzed/spicedb/pkg/migrate" + "github.com/authzed/spicedb/pkg/namespace" + core "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/tuple" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/samber/lo" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "golang.org/x/sync/errgroup" +) + +const pgSerializationFailure = "40001" + +// Implement the TestableDatastore interface +func (pgd *pgDatastore) ExampleRetryableError() error { + return &pgconn.PgError{ + Code: pgSerializationFailure, + } +} + +type postgresConfig struct { + targetMigration string + migrationPhase string + pgVersion string + pgbouncer bool +} + +// the global OTel tracer is used everywhere, so we synchronize tests over a global test tracer +var ( + otelMutex = sync.Mutex{} + testTraceProvider *trace.TracerProvider + postgresConfigs = lo.Map( + []string{pgversion.MinimumSupportedPostgresVersion, "14", "15", "16"}, + func(postgresVersion string, _ int) postgresConfig { + return postgresConfig{"head", "", postgresVersion, false} + }, + ) +) + +func init() { + testTraceProvider = trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + ) + otel.SetTracerProvider(testTraceProvider) +} + +func testPostgresDatastore(t *testing.T, pc []postgresConfig) { + for _, config := range pc { + pgbouncerStr := "" + if config.pgbouncer { + pgbouncerStr = "pgbouncer-" + } + t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { + t.Parallel() + b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer) + + test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(uri, + RevisionQuantization(revisionQuantization), + GCWindow(gcWindow), + GCInterval(gcInterval), + WatchBufferLength(watchBufferLength), + DebugAnalyzeBeforeStatistics(), + MigrationPhase(config.migrationPhase), + ) + require.NoError(t, err) + return ds + }) + return ds, nil + })) + + t.Run("GarbageCollection", createDatastoreTest( + b, + GarbageCollectionTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + + t.Run("TransactionTimestamps", createDatastoreTest( + b, + TransactionTimestampsTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + + t.Run("GarbageCollectionByTime", createDatastoreTest( + b, + GarbageCollectionByTimeTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + + t.Run("ChunkedGarbageCollection", createDatastoreTest( + b, + ChunkedGarbageCollectionTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + + t.Run("QuantizedRevisions", func(t *testing.T) { + QuantizedRevisionTest(t, b) + }) + + t.Run("WatchNotEnabled", func(t *testing.T) { + WatchNotEnabledTest(t, b, config.pgVersion) + }) + + t.Run("GCQueriesServedByExpectedIndexes", func(t *testing.T) { + GCQueriesServedByExpectedIndexes(t, b, config.pgVersion) + }) + + if config.migrationPhase == "" { + t.Run("RevisionInversion", createDatastoreTest( + b, + RevisionInversionTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + + t.Run("ConcurrentRevisionHead", createDatastoreTest( + b, + ConcurrentRevisionHeadTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + + t.Run("ConcurrentRevisionWatch", createDatastoreTest( + b, + ConcurrentRevisionWatchTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) + + t.Run("OverlappingRevisionWatch", createDatastoreTest( + b, + OverlappingRevisionWatchTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) + } + + t.Run("OTelTracing", createDatastoreTest( + b, + OTelTracingTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(1), + MigrationPhase(config.migrationPhase), + )) + }) + } +} + +func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, pc []postgresConfig) { + for _, config := range pc { + pgVersion := config.pgVersion + enablePgbouncer := config.pgbouncer + t.Run(fmt.Sprintf("postgres-%s", pgVersion), func(t *testing.T) { + t.Parallel() + + b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer) + + // NOTE: watch API requires the commit timestamps, so we skip those tests here. + test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(uri, + RevisionQuantization(revisionQuantization), + GCWindow(gcWindow), + GCInterval(gcInterval), + WatchBufferLength(watchBufferLength), + DebugAnalyzeBeforeStatistics(), + ) + require.NoError(t, err) + return ds + }) + return ds, nil + }), test.WithCategories(test.WatchCategory)) + }) + } +} + +type datastoreTestFunc func(t *testing.T, ds datastore.Datastore) + +func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(uri, options...) + require.NoError(t, err) + return ds + }) + defer ds.Close() + + tf(t, ds) + } +} + +func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + firstWrite, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + // Write basic namespaces. + return rwt.WriteNamespaces(ctx, namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + ), namespace.Namespace("user")) + }) + require.NoError(err) + + // Run GC at the transaction and ensure no relationships are removed. + pds := ds.(*pgDatastore) + + // Nothing to GC + removed, err := pds.DeleteBeforeTx(ctx, firstWrite) + require.NoError(err) + require.Zero(removed.Relationships) + require.Zero(removed.Namespaces) + + // Replace the namespace with a new one. + updateTwoNamespaces, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces( + ctx, + namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + namespace.MustRelation("unused", nil), + ), + namespace.Namespace("user"), + ) + }) + require.NoError(err) + + // Run GC to remove the old transaction + removed, err = pds.DeleteBeforeTx(ctx, updateTwoNamespaces) + require.NoError(err) + require.Zero(removed.Relationships) + require.Equal(int64(1), removed.Transactions) // firstWrite + require.Equal(int64(2), removed.Namespaces) // resource, user + + // Write a relationship. + tpl := tuple.Parse("resource:someresource#reader@user:someuser#...") + + wroteOneRelationship, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) + require.NoError(err) + + // Run GC at the transaction and ensure no relationships are removed, but 1 transaction (the previous write namespace) is. + removed, err = pds.DeleteBeforeTx(ctx, wroteOneRelationship) + require.NoError(err) + require.Zero(removed.Relationships) + require.Equal(int64(1), removed.Transactions) // updateTwoNamespaces + require.Zero(removed.Namespaces) + + // Run GC again and ensure there are no changes. + removed, err = pds.DeleteBeforeTx(ctx, wroteOneRelationship) + require.NoError(err) + require.Zero(removed.Relationships) + require.Zero(removed.Transactions) + require.Zero(removed.Namespaces) + + // Ensure the relationship is still present. + tRequire := testfixtures.TupleChecker{Require: require, DS: ds} + tRequire.TupleExists(ctx, tpl, wroteOneRelationship) + + // Overwrite the relationship by changing its caveat. + tpl = tuple.MustWithCaveat(tpl, "somecaveat") + relOverwrittenAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH, tpl) + require.NoError(err) + + // Run GC, which won't clean anything because we're dropping the write transaction only + removed, err = pds.DeleteBeforeTx(ctx, relOverwrittenAt) + require.NoError(err) + require.Equal(int64(1), removed.Relationships) // wroteOneRelationship + require.Equal(int64(1), removed.Transactions) // wroteOneRelationship + require.Zero(removed.Namespaces) + + // Run GC again and ensure there are no changes. + removed, err = pds.DeleteBeforeTx(ctx, relOverwrittenAt) + require.NoError(err) + require.Zero(removed.Relationships) + require.Zero(removed.Transactions) + require.Zero(removed.Namespaces) + + // Ensure the relationship is still present. + tRequire.TupleExists(ctx, tpl, relOverwrittenAt) + + // Delete the relationship. + relDeletedAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tpl) + require.NoError(err) + + // Ensure the relationship is gone. + tRequire.NoTupleExists(ctx, tpl, relDeletedAt) + + // Run GC, which will now drop the overwrite transaction only and the first tpl revision + removed, err = pds.DeleteBeforeTx(ctx, relDeletedAt) + require.NoError(err) + require.Equal(int64(1), removed.Relationships) + require.Equal(int64(1), removed.Transactions) // relOverwrittenAt + require.Zero(removed.Namespaces) + + // Run GC again and ensure there are no changes. + removed, err = pds.DeleteBeforeTx(ctx, relDeletedAt) + require.NoError(err) + require.Zero(removed.Relationships) + require.Zero(removed.Transactions) + require.Zero(removed.Namespaces) + + // Write a the relationship a few times. + var relLastWriteAt datastore.Revision + for i := 0; i < 3; i++ { + tpl = tuple.MustWithCaveat(tpl, fmt.Sprintf("somecaveat%d", i)) + + var err error + relLastWriteAt, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH, tpl) + require.NoError(err) + } + + // Run GC at the transaction and ensure the older copies of the relationships are removed, + // as well as the 2 older write transactions and the older delete transaction. + removed, err = pds.DeleteBeforeTx(ctx, relLastWriteAt) + require.NoError(err) + require.Equal(int64(2), removed.Relationships) // delete, old1 + require.Equal(int64(3), removed.Transactions) // removed, write1, write2 + require.Zero(removed.Namespaces) + + // Ensure the relationship is still present. + tRequire.TupleExists(ctx, tpl, relLastWriteAt) + + // Inject a transaction to clean up the last write + lastRev, err := pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return nil + }) + require.NoError(err) + + // Run GC to clean up the last write + removed, err = pds.DeleteBeforeTx(ctx, lastRev) + require.NoError(err) + require.Zero(removed.Relationships) // write3 + require.Equal(int64(1), removed.Transactions) // write3 + require.Zero(removed.Namespaces) +} + +func TransactionTimestampsTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + + // Setting db default time zone to before UTC + pgd := ds.(*pgDatastore) + _, err = pgd.writePool.Exec(ctx, "SET TIME ZONE 'America/New_York';") + require.NoError(err) + + // Get timestamp in UTC as reference + startTimeUTC, err := pgd.Now(ctx) + require.NoError(err) + + // Transaction timestamp should not be stored in system time zone + tx, err := pgd.writePool.Begin(ctx) + require.NoError(err) + + txXID, _, err := createNewTransaction(ctx, tx) + require.NoError(err) + + err = tx.Commit(ctx) + require.NoError(err) + + var ts time.Time + sql, args, err := psql.Select("timestamp").From(tableTransaction).Where(sq.Eq{"xid": txXID}).ToSql() + require.NoError(err) + err = pgd.readPool.QueryRow(ctx, sql, args...).Scan(&ts) + require.NoError(err) + + // Transaction timestamp will be before the reference time if it was stored + // in the default time zone and reinterpreted + require.True(startTimeUTC.Before(ts)) +} + +func GarbageCollectionByTimeTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + // Write basic namespaces. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + ), namespace.Namespace("user")) + }) + require.NoError(err) + + pds := ds.(*pgDatastore) + + // Sleep 1ms to ensure GC will delete the previous transaction. + time.Sleep(1 * time.Millisecond) + + // Write a relationship. + tpl := tuple.Parse("resource:someresource#reader@user:someuser#...") + relLastWriteAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) + require.NoError(err) + + // Run GC and ensure only transactions were removed. + afterWrite, err := pds.Now(ctx) + require.NoError(err) + + afterWriteTx, err := pds.TxIDBefore(ctx, afterWrite) + require.NoError(err) + + removed, err := pds.DeleteBeforeTx(ctx, afterWriteTx) + require.NoError(err) + require.Zero(removed.Relationships) + require.True(removed.Transactions > 0) + require.Zero(removed.Namespaces) + + // Ensure the relationship is still present. + tRequire := testfixtures.TupleChecker{Require: require, DS: ds} + tRequire.TupleExists(ctx, tpl, relLastWriteAt) + + // Sleep 1ms to ensure GC will delete the previous write. + time.Sleep(1 * time.Millisecond) + + // Delete the relationship. + relDeletedAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tpl) + require.NoError(err) + + // Inject a revision to sweep up the last revision + _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return nil + }) + require.NoError(err) + + // Run GC and ensure the relationship is not removed. + afterDelete, err := pds.Now(ctx) + require.NoError(err) + + afterDeleteTx, err := pds.TxIDBefore(ctx, afterDelete) + require.NoError(err) + + removed, err = pds.DeleteBeforeTx(ctx, afterDeleteTx) + require.NoError(err) + require.Equal(int64(1), removed.Relationships) + require.Equal(int64(2), removed.Transactions) // relDeletedAt, injected + require.Zero(removed.Namespaces) + + // Ensure the relationship is still not present. + tRequire.NoTupleExists(ctx, tpl, relDeletedAt) +} + +const chunkRelationshipCount = 2000 + +func ChunkedGarbageCollectionTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + // Write basic namespaces. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + ), namespace.Namespace("user")) + }) + require.NoError(err) + + pds := ds.(*pgDatastore) + + // Prepare relationships to write. + var tpls []*core.RelationTuple + for i := 0; i < chunkRelationshipCount; i++ { + tpl := tuple.Parse(fmt.Sprintf("resource:resource-%d#reader@user:someuser#...", i)) + tpls = append(tpls, tpl) + } + + // Write a large number of relationships. + writtenAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpls...) + require.NoError(err) + + // Ensure the relationships were written. + tRequire := testfixtures.TupleChecker{Require: require, DS: ds} + for _, tpl := range tpls { + tRequire.TupleExists(ctx, tpl, writtenAt) + } + + // Run GC and ensure only transactions were removed. + afterWrite, err := pds.Now(ctx) + require.NoError(err) + + afterWriteTx, err := pds.TxIDBefore(ctx, afterWrite) + require.NoError(err) + + removed, err := pds.DeleteBeforeTx(ctx, afterWriteTx) + require.NoError(err) + require.Zero(removed.Relationships) + require.True(removed.Transactions > 0) + require.Zero(removed.Namespaces) + + // Sleep to ensure the relationships will GC. + time.Sleep(1 * time.Millisecond) + + // Delete all the relationships. + deletedAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tpls...) + require.NoError(err) + + // Inject a revision to sweep up the last revision + _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return nil + }) + require.NoError(err) + + // Ensure the relationships were deleted. + for _, tpl := range tpls { + tRequire.NoTupleExists(ctx, tpl, deletedAt) + } + + // Sleep to ensure GC. + time.Sleep(1 * time.Millisecond) + + // Run GC and ensure all the stale relationships are removed. + afterDelete, err := pds.Now(ctx) + require.NoError(err) + + afterDeleteTx, err := pds.TxIDBefore(ctx, afterDelete) + require.NoError(err) + + removed, err = pds.DeleteBeforeTx(ctx, afterDeleteTx) + require.NoError(err) + require.Equal(int64(chunkRelationshipCount), removed.Relationships) + require.Equal(int64(2), removed.Transactions) + require.Zero(removed.Namespaces) +} + +func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { + testCases := []struct { + testName string + quantization time.Duration + relativeTimes []time.Duration + numLower uint64 + numHigher uint64 + }{ + { + "DefaultRevision", + 1 * time.Second, + []time.Duration{}, + 0, 0, + }, + { + "OnlyPastRevisions", + 1 * time.Second, + []time.Duration{-2 * time.Second}, + 1, 0, + }, + { + "OnlyFutureRevisions", + 1 * time.Second, + []time.Duration{2 * time.Second}, + 0, 1, + }, + { + "QuantizedLower", + 2 * time.Second, + []time.Duration{-4 * time.Second, -1 * time.Nanosecond, 0}, + 1, 2, + }, + { + "QuantizationDisabled", + 1 * time.Nanosecond, + []time.Duration{-2 * time.Second, -1 * time.Nanosecond, 0}, + 3, 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + require := require.New(t) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var conn *pgx.Conn + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + var err error + conn, err = pgx.Connect(ctx, uri) + RegisterTypes(conn.TypeMap()) + require.NoError(err) + + ds, err := newPostgresDatastore( + uri, + RevisionQuantization(5*time.Second), + GCWindow(24*time.Hour), + WatchBufferLength(1), + ) + require.NoError(err) + + return ds + }) + defer ds.Close() + + // set a random time zone to ensure the queries are unaffected by tz + _, err := conn.Exec(ctx, fmt.Sprintf("SET TIME ZONE -%d", rand.Intn(8)+1)) + require.NoError(err) + + var dbNow time.Time + err = conn.QueryRow(ctx, "SELECT (NOW() AT TIME ZONE 'utc')").Scan(&dbNow) + require.NoError(err) + + if len(tc.relativeTimes) > 0 { + psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) + insertTxn := psql.Insert(tableTransaction).Columns(colTimestamp) + + for _, offset := range tc.relativeTimes { + sql, args, err := insertTxn.Values(dbNow.Add(offset)).ToSql() + require.NoError(err) + + _, err = conn.Exec(ctx, sql, args...) + require.NoError(err) + } + } + + queryRevision := fmt.Sprintf( + querySelectRevision, + colXID, + tableTransaction, + colTimestamp, + tc.quantization.Nanoseconds(), + colSnapshot, + ) + + var revision xid8 + var snapshot pgSnapshot + var validFor time.Duration + err = conn.QueryRow(ctx, queryRevision).Scan(&revision, &snapshot, &validFor) + require.NoError(err) + + queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" + numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") + numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") + + var numLower, numHigher uint64 + require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) + require.NoError(conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot) + + // Subtract one from numLower because of the artificially injected first transaction row + require.Equal(tc.numLower, numLower-1) + require.Equal(tc.numHigher, numHigher) + }) + } +} + +// ConcurrentRevisionHeadTest uses goroutines and channels to intentionally set up a pair of +// revisions that are concurrently applied and then ensures a call to HeadRevision reflects +// the changes found in *both* revisions. +func ConcurrentRevisionHeadTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + // Write basic namespaces. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + ), namespace.Namespace("user")) + }) + require.NoError(err) + + g := errgroup.Group{} + + waitToStart := make(chan struct{}) + waitToFinish := make(chan struct{}) + + var commitLastRev, commitFirstRev datastore.Revision + g.Go(func() error { + var err error + commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: "resource", + ObjectId: "123", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) + require.NoError(err) + + close(waitToStart) + <-waitToFinish + + return err + }) + require.NoError(err) + return nil + }) + + <-waitToStart + + commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: "resource", + ObjectId: "789", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) + }) + close(waitToFinish) + + require.NoError(err) + require.NoError(g.Wait()) + + // Ensure the revisions do not compare. + require.False(commitFirstRev.GreaterThan(commitLastRev)) + require.False(commitFirstRev.Equal(commitLastRev)) + + // Ensure a call to HeadRevision now reflects both sets of data applied. + headRev, err := ds.HeadRevision(ctx) + require.NoError(err) + + reader := ds.SnapshotReader(headRev) + it, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ + ResourceType: "resource", + }) + require.NoError(err) + defer it.Close() + + found := []*core.RelationTuple{} + for tpl := it.Next(); tpl != nil; tpl = it.Next() { + require.NoError(it.Err()) + found = append(found, tpl) + } + + require.Equal(2, len(found), "missing relationships in %v", found) +} + +// ConcurrentRevisionWatchTest uses goroutines and channels to intentionally set up a pair of +// revisions that are concurrently applied and then ensures that a Watch call does not end up +// in a loop. +func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + withCancel, cancel := context.WithCancel(ctx) + defer cancel() + + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + + // Write basic namespaces. + rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + ), namespace.Namespace("user")) + }) + require.NoError(err) + + // Start a watch loop. + waitForWatch := make(chan struct{}) + seenWatchRevisions := make([]datastore.Revision, 0) + seenWatchRevisionsLock := sync.Mutex{} + + go func() { + changes, _ := ds.Watch(withCancel, rev) + + waitForWatch <- struct{}{} + + for { + select { + case change, ok := <-changes: + if !ok { + return + } + + seenWatchRevisionsLock.Lock() + seenWatchRevisions = append(seenWatchRevisions, change.Revision) + seenWatchRevisionsLock.Unlock() + + time.Sleep(1 * time.Millisecond) + case <-withCancel.Done(): + return + } + } + }() + + <-waitForWatch + + // Write the two concurrent transactions, while watching for changes. + g := errgroup.Group{} + + waitToStart := make(chan struct{}) + waitToFinish := make(chan struct{}) + + var commitLastRev, commitFirstRev datastore.Revision + g.Go(func() error { + var err error + commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ + tuple.Touch(tuple.MustParse("something:001#viewer@user:123")), + tuple.Touch(tuple.MustParse("something:002#viewer@user:123")), + tuple.Touch(tuple.MustParse("something:003#viewer@user:123")), + }) + require.NoError(err) + + close(waitToStart) + <-waitToFinish + + return err + }) + require.NoError(err) + return nil + }) + + <-waitToStart + + commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ + tuple.Touch(tuple.MustParse("resource:1001#reader@user:456")), + tuple.Touch(tuple.MustParse("resource:1002#reader@user:456")), + tuple.Touch(tuple.MustParse("resource:1003#reader@user:456")), + }) + }) + close(waitToFinish) + + require.NoError(err) + require.NoError(g.Wait()) + + // Ensure the revisions do not compare. + require.False(commitFirstRev.GreaterThan(commitLastRev)) + require.False(commitLastRev.GreaterThan(commitFirstRev)) + require.False(commitFirstRev.Equal(commitLastRev)) + + // Write another revision. + afterRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: "resource", + ObjectId: "2345", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) + }) + require.NoError(err) + require.True(afterRev.GreaterThan(commitFirstRev)) + require.True(afterRev.GreaterThan(commitLastRev)) + + // Ensure that the last revision is eventually seen from the watch. + require.Eventually(func() bool { + seenWatchRevisionsLock.Lock() + defer seenWatchRevisionsLock.Unlock() + return len(seenWatchRevisions) == 3 && seenWatchRevisions[len(seenWatchRevisions)-1].String() == afterRev.String() + }, 2*time.Second, 5*time.Millisecond) +} + +func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + + rev, err := ds.HeadRevision(ctx) + require.NoError(err) + + pds := ds.(*pgDatastore) + require.True(pds.watchEnabled) + + prev := rev.(postgresRevision) + nexttx := prev.snapshot.xmax + 1 + + // Manually construct an equivalent of overlapping transactions in the database, from the repro + // information (See: https://github.com/authzed/spicedb/issues/1272) + err = pgx.BeginTxFunc(ctx, pds.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { + _, err := tx.Exec(ctx, fmt.Sprintf( + `INSERT INTO %s ("%s", "%s") VALUES ('%d', '%d:%d:')`, + tableTransaction, + colXID, + colSnapshot, + nexttx, + nexttx, + nexttx, + )) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, fmt.Sprintf( + `INSERT INTO %s ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s") VALUES ('somenamespace', '123', 'viewer', 'user', '456', '...', '', null, '%d'::xid8)`, + tableTuple, + colNamespace, + colObjectID, + colRelation, + colUsersetNamespace, + colUsersetObjectID, + colUsersetRelation, + colCaveatContextName, + colCaveatContext, + colCreatedXid, + nexttx, + )) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, fmt.Sprintf( + `INSERT INTO %s ("xid", "snapshot") VALUES ('%d', '%d:%d:')`, + tableTransaction, + nexttx+1, + nexttx, + nexttx, + )) + + if err != nil { + return err + } + + _, err = tx.Exec(ctx, fmt.Sprintf( + `INSERT INTO %s ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s") VALUES ('somenamespace', '456', 'viewer', 'user', '456', '...', '', null, '%d'::xid8)`, + tableTuple, + colNamespace, + colObjectID, + colRelation, + colUsersetNamespace, + colUsersetObjectID, + colUsersetRelation, + colCaveatContextName, + colCaveatContext, + colCreatedXid, + nexttx+1, + )) + + return err + }) + require.NoError(err) + + // Call watch and ensure it terminates with having only read the two expected sets of changes. + changes, errChan := ds.Watch(ctx, rev) + transactionCount := 0 +loop: + for { + select { + case _, ok := <-changes: + if !ok { + err := <-errChan + require.NoError(err) + return + } + + transactionCount++ + time.Sleep(10 * time.Millisecond) + case <-time.NewTimer(1 * time.Second).C: + break loop + } + } + + require.Equal(2, transactionCount) +} + +// RevisionInversionTest uses goroutines and channels to intentionally set up a pair of +// revisions that might compare incorrectly. +func RevisionInversionTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + // Write basic namespaces. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, namespace.Namespace( + "resource", + namespace.MustRelation("reader", nil), + ), namespace.Namespace("user")) + }) + require.NoError(err) + + g := errgroup.Group{} + + waitToStart := make(chan struct{}) + waitToFinish := make(chan struct{}) + + var commitLastRev, commitFirstRev datastore.Revision + g.Go(func() error { + var err error + commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: "resource", + ObjectId: "123", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) + require.NoError(err) + + close(waitToStart) + <-waitToFinish + + return err + }) + require.NoError(err) + return nil + }) + + <-waitToStart + + commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: "resource", + ObjectId: "789", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "ten", + Relation: "...", + }, + }) + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) + }) + close(waitToFinish) + + require.NoError(err) + require.NoError(g.Wait()) + require.False(commitFirstRev.GreaterThan(commitLastRev)) + require.False(commitFirstRev.Equal(commitLastRev)) +} + +func OTelTracingTest(t *testing.T, ds datastore.Datastore) { + otelMutex.Lock() + defer otelMutex.Unlock() + + require := require.New(t) + + ctx := context.Background() + r, err := ds.ReadyState(ctx) + require.NoError(err) + require.True(r.IsReady) + + spanrecorder := tracetest.NewSpanRecorder() + testTraceProvider.RegisterSpanProcessor(spanrecorder) + + // Perform basic operation + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteNamespaces(ctx, namespace.Namespace("resource")) + }) + require.NoError(err) + + ended := spanrecorder.Ended() + var present bool + for _, span := range ended { + if span.Name() == "query INSERT" { + present = true + } + } + require.True(present, "missing trace for Streaming gRPC call") +} + +func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgVersion string) { + require := require.New(t) + + ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion, false).NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(uri, + RevisionQuantization(0), + GCWindow(time.Millisecond*1), + WatchBufferLength(1), + ) + require.NoError(err) + return ds + }) + defer ds.Close() + + ds, revision := testfixtures.StandardDatastoreWithData(ds, require) + _, errChan := ds.Watch( + context.Background(), + revision, + ) + err := <-errChan + require.NotNil(err) + require.Contains(err.Error(), "track_commit_timestamp=on") +} + +func BenchmarkPostgresQuery(b *testing.B) { + req := require.New(b) + + ds := testdatastore.RunPostgresForTesting(b, "", migrate.Head, pgversion.MinimumSupportedPostgresVersion, false).NewDatastore(b, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(uri, + RevisionQuantization(0), + GCWindow(time.Millisecond*1), + WatchBufferLength(1), + ) + require.NoError(b, err) + return ds + }) + defer ds.Close() + ds, revision := testfixtures.StandardDatastoreWithData(ds, req) + + b.Run("benchmark checks", func(b *testing.B) { + require := require.New(b) + + for i := 0; i < b.N; i++ { + iter, err := ds.SnapshotReader(revision).QueryRelationships(context.Background(), datastore.RelationshipsFilter{ + ResourceType: testfixtures.DocumentNS.Name, + }) + require.NoError(err) + + defer iter.Close() + + for tpl := iter.Next(); tpl != nil; tpl = iter.Next() { + require.Equal(testfixtures.DocumentNS.Name, tpl.ResourceAndRelation.Namespace) + } + require.NoError(iter.Err()) + } + }) +} + +func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.QueryInterceptor, pgVersion string) datastore.Datastore { + require := require.New(t) + + ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion, false).NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newPostgresDatastore(uri, + RevisionQuantization(0), + GCWindow(time.Millisecond*1), + WatchBufferLength(1), + WithQueryInterceptor(interceptor), + ) + require.NoError(err) + return ds + }) + t.Cleanup(func() { + ds.Close() + }) + + ds, _ = testfixtures.StandardDatastoreWithData(ds, require) + + // Write namespaces and a few thousand relationships. + ctx := context.Background() + for i := 0; i < 1000; i++ { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + err := rwt.WriteNamespaces(ctx, namespace.Namespace( + fmt.Sprintf("resource%d", i), + namespace.MustRelation("reader", nil))) + if err != nil { + return err + } + + // Write some relationships. + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: testfixtures.DocumentNS.Name, + ObjectId: fmt.Sprintf("doc%d", i), + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + + rtu2 := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: fmt.Sprintf("resource%d", i), + ObjectId: "123", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + + rtu3 := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: fmt.Sprintf("resource%d", i), + ObjectId: "123", + Relation: "writer", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu, rtu2, rtu3}) + }) + require.NoError(err) + } + + // Delete some relationships. + for i := 990; i < 1000; i++ { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + rtu := tuple.Delete(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: testfixtures.DocumentNS.Name, + ObjectId: fmt.Sprintf("doc%d", i), + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + + rtu2 := tuple.Delete(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: fmt.Sprintf("resource%d", i), + ObjectId: "123", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu, rtu2}) + }) + require.NoError(err) + } + + // Write some more relationships. + for i := 1000; i < 1100; i++ { + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + // Write some relationships. + rtu := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: testfixtures.DocumentNS.Name, + ObjectId: fmt.Sprintf("doc%d", i), + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + + rtu2 := tuple.Touch(&core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: fmt.Sprintf("resource%d", i), + ObjectId: "123", + Relation: "reader", + }, + Subject: &core.ObjectAndRelation{ + Namespace: "user", + ObjectId: "456", + Relation: "...", + }, + }) + return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu, rtu2}) + }) + require.NoError(err) + } + + return ds +} + +func GCQueriesServedByExpectedIndexes(t *testing.T, _ testdatastore.RunningEngineForTest, pgVersion string) { + require := require.New(t) + interceptor := &withQueryInterceptor{explanations: make(map[string]string, 0)} + ds := datastoreWithInterceptorAndTestData(t, interceptor, pgVersion) + + // Get the head revision. + ctx := context.Background() + revision, err := ds.HeadRevision(ctx) + require.NoError(err) + + for { + wds, ok := ds.(datastore.UnwrappableDatastore) + if !ok { + break + } + ds = wds.Unwrap() + } + + casted, ok := ds.(common.GarbageCollector) + require.True(ok) + + _, err = casted.DeleteBeforeTx(context.Background(), revision) + require.NoError(err) + + require.NotEmpty(interceptor.explanations, "expected queries to be executed") + + // Ensure we have indexes representing each query in the GC workflow. + for _, explanation := range interceptor.explanations { + switch { + case strings.HasPrefix(explanation, "Delete on relation_tuple_transaction"): + fallthrough + + case strings.HasPrefix(explanation, "Delete on namespace_config"): + fallthrough + + case strings.HasPrefix(explanation, "Delete on relation_tuple"): + require.Contains(explanation, "Index Scan") + + default: + require.Failf("unknown GC query: %s", explanation) + } + } +} diff --git a/internal/datastore/postgres/postgres_test.go b/internal/datastore/postgres/postgres_test.go index 52b4417542..ab3057c1ef 100644 --- a/internal/datastore/postgres/postgres_test.go +++ b/internal/datastore/postgres/postgres_test.go @@ -1,1386 +1,20 @@ -//go:build ci && docker -// +build ci,docker +//go:build ci && docker && postgres +// +build ci,docker,postgres package postgres import ( - "context" - "fmt" - "math/rand" - "strings" - "sync" "testing" - "time" - - sq "github.com/Masterminds/squirrel" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - "github.com/samber/lo" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "golang.org/x/sync/errgroup" - - "github.com/authzed/spicedb/internal/datastore/common" - pgcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" - pgversion "github.com/authzed/spicedb/internal/datastore/postgres/version" - "github.com/authzed/spicedb/internal/testfixtures" - testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" - "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/test" - "github.com/authzed/spicedb/pkg/migrate" - "github.com/authzed/spicedb/pkg/namespace" - core "github.com/authzed/spicedb/pkg/proto/core/v1" - "github.com/authzed/spicedb/pkg/tuple" ) -const pgSerializationFailure = "40001" - -// Implement the TestableDatastore interface -func (pgd *pgDatastore) ExampleRetryableError() error { - return &pgconn.PgError{ - Code: pgSerializationFailure, - } -} - -type postgresConfig struct { - targetMigration string - migrationPhase string - pgVersion string - pgbouncer bool -} - -// the global OTel tracer is used everywhere, so we synchronize tests over a global test tracer -var ( - otelMutex = sync.Mutex{} - testTraceProvider *trace.TracerProvider - postgresConfigs = lo.FlatMap( - []string{pgversion.MinimumSupportedPostgresVersion, "14", "15", "16"}, - func(postgresVersion string, _ int) []postgresConfig { - return lo.Map([]bool{false, true}, func(enablePgbouncer bool, _ int) postgresConfig { - return postgresConfig{"head", "", postgresVersion, enablePgbouncer} - }) - }, - ) -) - -func init() { - testTraceProvider = trace.NewTracerProvider( - trace.WithSampler(trace.AlwaysSample()), - ) - otel.SetTracerProvider(testTraceProvider) -} - func TestPostgresDatastore(t *testing.T) { t.Parallel() - for _, config := range postgresConfigs { - pgbouncerStr := "" - if config.pgbouncer { - pgbouncerStr = "pgbouncer-" - } - t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { - t.Parallel() - b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer) - - test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { - ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(uri, - RevisionQuantization(revisionQuantization), - GCWindow(gcWindow), - GCInterval(gcInterval), - WatchBufferLength(watchBufferLength), - DebugAnalyzeBeforeStatistics(), - MigrationPhase(config.migrationPhase), - ) - require.NoError(t, err) - return ds - }) - return ds, nil - })) - - t.Run("GarbageCollection", createDatastoreTest( - b, - GarbageCollectionTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - - t.Run("TransactionTimestamps", createDatastoreTest( - b, - TransactionTimestampsTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - - t.Run("GarbageCollectionByTime", createDatastoreTest( - b, - GarbageCollectionByTimeTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - - t.Run("ChunkedGarbageCollection", createDatastoreTest( - b, - ChunkedGarbageCollectionTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - - t.Run("QuantizedRevisions", func(t *testing.T) { - QuantizedRevisionTest(t, b) - }) - - t.Run("WatchNotEnabled", func(t *testing.T) { - WatchNotEnabledTest(t, b, config.pgVersion) - }) - - t.Run("GCQueriesServedByExpectedIndexes", func(t *testing.T) { - GCQueriesServedByExpectedIndexes(t, b, config.pgVersion) - }) - - if config.migrationPhase == "" { - t.Run("RevisionInversion", createDatastoreTest( - b, - RevisionInversionTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - - t.Run("ConcurrentRevisionHead", createDatastoreTest( - b, - ConcurrentRevisionHeadTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - - t.Run("ConcurrentRevisionWatch", createDatastoreTest( - b, - ConcurrentRevisionWatchTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(50), - MigrationPhase(config.migrationPhase), - )) - - t.Run("OverlappingRevisionWatch", createDatastoreTest( - b, - OverlappingRevisionWatchTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(50), - MigrationPhase(config.migrationPhase), - )) - } - - t.Run("OTelTracing", createDatastoreTest( - b, - OTelTracingTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), - WatchBufferLength(1), - MigrationPhase(config.migrationPhase), - )) - }) - } + testPostgresDatastore(t, postgresConfigs) } func TestPostgresDatastoreWithoutCommitTimestamps(t *testing.T) { t.Parallel() - for _, config := range postgresConfigs { - pgVersion := config.pgVersion - enablePgbouncer := config.pgbouncer - t.Run(fmt.Sprintf("postgres-%s", pgVersion), func(t *testing.T) { - t.Parallel() - - b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer) - - // NOTE: watch API requires the commit timestamps, so we skip those tests here. - test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { - ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(uri, - RevisionQuantization(revisionQuantization), - GCWindow(gcWindow), - GCInterval(gcInterval), - WatchBufferLength(watchBufferLength), - DebugAnalyzeBeforeStatistics(), - ) - require.NoError(t, err) - return ds - }) - return ds, nil - }), test.WithCategories(test.WatchCategory)) - }) - } -} - -type datastoreTestFunc func(t *testing.T, ds datastore.Datastore) - -func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { - return func(t *testing.T) { - ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(uri, options...) - require.NoError(t, err) - return ds - }) - defer ds.Close() - - tf(t, ds) - } -} - -func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - firstWrite, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - // Write basic namespaces. - return rwt.WriteNamespaces(ctx, namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - ), namespace.Namespace("user")) - }) - require.NoError(err) - - // Run GC at the transaction and ensure no relationships are removed. - pds := ds.(*pgDatastore) - - // Nothing to GC - removed, err := pds.DeleteBeforeTx(ctx, firstWrite) - require.NoError(err) - require.Zero(removed.Relationships) - require.Zero(removed.Namespaces) - - // Replace the namespace with a new one. - updateTwoNamespaces, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces( - ctx, - namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - namespace.MustRelation("unused", nil), - ), - namespace.Namespace("user"), - ) - }) - require.NoError(err) - - // Run GC to remove the old transaction - removed, err = pds.DeleteBeforeTx(ctx, updateTwoNamespaces) - require.NoError(err) - require.Zero(removed.Relationships) - require.Equal(int64(1), removed.Transactions) // firstWrite - require.Equal(int64(2), removed.Namespaces) // resource, user - - // Write a relationship. - tpl := tuple.Parse("resource:someresource#reader@user:someuser#...") - - wroteOneRelationship, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) - require.NoError(err) - - // Run GC at the transaction and ensure no relationships are removed, but 1 transaction (the previous write namespace) is. - removed, err = pds.DeleteBeforeTx(ctx, wroteOneRelationship) - require.NoError(err) - require.Zero(removed.Relationships) - require.Equal(int64(1), removed.Transactions) // updateTwoNamespaces - require.Zero(removed.Namespaces) - - // Run GC again and ensure there are no changes. - removed, err = pds.DeleteBeforeTx(ctx, wroteOneRelationship) - require.NoError(err) - require.Zero(removed.Relationships) - require.Zero(removed.Transactions) - require.Zero(removed.Namespaces) - - // Ensure the relationship is still present. - tRequire := testfixtures.TupleChecker{Require: require, DS: ds} - tRequire.TupleExists(ctx, tpl, wroteOneRelationship) - - // Overwrite the relationship by changing its caveat. - tpl = tuple.MustWithCaveat(tpl, "somecaveat") - relOverwrittenAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH, tpl) - require.NoError(err) - - // Run GC, which won't clean anything because we're dropping the write transaction only - removed, err = pds.DeleteBeforeTx(ctx, relOverwrittenAt) - require.NoError(err) - require.Equal(int64(1), removed.Relationships) // wroteOneRelationship - require.Equal(int64(1), removed.Transactions) // wroteOneRelationship - require.Zero(removed.Namespaces) - - // Run GC again and ensure there are no changes. - removed, err = pds.DeleteBeforeTx(ctx, relOverwrittenAt) - require.NoError(err) - require.Zero(removed.Relationships) - require.Zero(removed.Transactions) - require.Zero(removed.Namespaces) - - // Ensure the relationship is still present. - tRequire.TupleExists(ctx, tpl, relOverwrittenAt) - - // Delete the relationship. - relDeletedAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tpl) - require.NoError(err) - - // Ensure the relationship is gone. - tRequire.NoTupleExists(ctx, tpl, relDeletedAt) - - // Run GC, which will now drop the overwrite transaction only and the first tpl revision - removed, err = pds.DeleteBeforeTx(ctx, relDeletedAt) - require.NoError(err) - require.Equal(int64(1), removed.Relationships) - require.Equal(int64(1), removed.Transactions) // relOverwrittenAt - require.Zero(removed.Namespaces) - - // Run GC again and ensure there are no changes. - removed, err = pds.DeleteBeforeTx(ctx, relDeletedAt) - require.NoError(err) - require.Zero(removed.Relationships) - require.Zero(removed.Transactions) - require.Zero(removed.Namespaces) - - // Write a the relationship a few times. - var relLastWriteAt datastore.Revision - for i := 0; i < 3; i++ { - tpl = tuple.MustWithCaveat(tpl, fmt.Sprintf("somecaveat%d", i)) - - var err error - relLastWriteAt, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH, tpl) - require.NoError(err) - } - - // Run GC at the transaction and ensure the older copies of the relationships are removed, - // as well as the 2 older write transactions and the older delete transaction. - removed, err = pds.DeleteBeforeTx(ctx, relLastWriteAt) - require.NoError(err) - require.Equal(int64(2), removed.Relationships) // delete, old1 - require.Equal(int64(3), removed.Transactions) // removed, write1, write2 - require.Zero(removed.Namespaces) - - // Ensure the relationship is still present. - tRequire.TupleExists(ctx, tpl, relLastWriteAt) - - // Inject a transaction to clean up the last write - lastRev, err := pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return nil - }) - require.NoError(err) - - // Run GC to clean up the last write - removed, err = pds.DeleteBeforeTx(ctx, lastRev) - require.NoError(err) - require.Zero(removed.Relationships) // write3 - require.Equal(int64(1), removed.Transactions) // write3 - require.Zero(removed.Namespaces) -} - -func TransactionTimestampsTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - - // Setting db default time zone to before UTC - pgd := ds.(*pgDatastore) - _, err = pgd.writePool.Exec(ctx, "SET TIME ZONE 'America/New_York';") - require.NoError(err) - - // Get timestamp in UTC as reference - startTimeUTC, err := pgd.Now(ctx) - require.NoError(err) - - // Transaction timestamp should not be stored in system time zone - tx, err := pgd.writePool.Begin(ctx) - require.NoError(err) - - txXID, _, err := createNewTransaction(ctx, tx) - require.NoError(err) - - err = tx.Commit(ctx) - require.NoError(err) - - var ts time.Time - sql, args, err := psql.Select("timestamp").From(tableTransaction).Where(sq.Eq{"xid": txXID}).ToSql() - require.NoError(err) - err = pgd.readPool.QueryRow(ctx, sql, args...).Scan(&ts) - require.NoError(err) - - // Transaction timestamp will be before the reference time if it was stored - // in the default time zone and reinterpreted - require.True(startTimeUTC.Before(ts)) -} - -func GarbageCollectionByTimeTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces(ctx, namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - ), namespace.Namespace("user")) - }) - require.NoError(err) - - pds := ds.(*pgDatastore) - - // Sleep 1ms to ensure GC will delete the previous transaction. - time.Sleep(1 * time.Millisecond) - - // Write a relationship. - tpl := tuple.Parse("resource:someresource#reader@user:someuser#...") - relLastWriteAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) - require.NoError(err) - - // Run GC and ensure only transactions were removed. - afterWrite, err := pds.Now(ctx) - require.NoError(err) - - afterWriteTx, err := pds.TxIDBefore(ctx, afterWrite) - require.NoError(err) - - removed, err := pds.DeleteBeforeTx(ctx, afterWriteTx) - require.NoError(err) - require.Zero(removed.Relationships) - require.True(removed.Transactions > 0) - require.Zero(removed.Namespaces) - - // Ensure the relationship is still present. - tRequire := testfixtures.TupleChecker{Require: require, DS: ds} - tRequire.TupleExists(ctx, tpl, relLastWriteAt) - - // Sleep 1ms to ensure GC will delete the previous write. - time.Sleep(1 * time.Millisecond) - - // Delete the relationship. - relDeletedAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tpl) - require.NoError(err) - - // Inject a revision to sweep up the last revision - _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return nil - }) - require.NoError(err) - - // Run GC and ensure the relationship is not removed. - afterDelete, err := pds.Now(ctx) - require.NoError(err) - - afterDeleteTx, err := pds.TxIDBefore(ctx, afterDelete) - require.NoError(err) - - removed, err = pds.DeleteBeforeTx(ctx, afterDeleteTx) - require.NoError(err) - require.Equal(int64(1), removed.Relationships) - require.Equal(int64(2), removed.Transactions) // relDeletedAt, injected - require.Zero(removed.Namespaces) - - // Ensure the relationship is still not present. - tRequire.NoTupleExists(ctx, tpl, relDeletedAt) -} - -const chunkRelationshipCount = 2000 - -func ChunkedGarbageCollectionTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces(ctx, namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - ), namespace.Namespace("user")) - }) - require.NoError(err) - - pds := ds.(*pgDatastore) - - // Prepare relationships to write. - var tpls []*core.RelationTuple - for i := 0; i < chunkRelationshipCount; i++ { - tpl := tuple.Parse(fmt.Sprintf("resource:resource-%d#reader@user:someuser#...", i)) - tpls = append(tpls, tpl) - } - - // Write a large number of relationships. - writtenAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpls...) - require.NoError(err) - - // Ensure the relationships were written. - tRequire := testfixtures.TupleChecker{Require: require, DS: ds} - for _, tpl := range tpls { - tRequire.TupleExists(ctx, tpl, writtenAt) - } - - // Run GC and ensure only transactions were removed. - afterWrite, err := pds.Now(ctx) - require.NoError(err) - - afterWriteTx, err := pds.TxIDBefore(ctx, afterWrite) - require.NoError(err) - - removed, err := pds.DeleteBeforeTx(ctx, afterWriteTx) - require.NoError(err) - require.Zero(removed.Relationships) - require.True(removed.Transactions > 0) - require.Zero(removed.Namespaces) - - // Sleep to ensure the relationships will GC. - time.Sleep(1 * time.Millisecond) - - // Delete all the relationships. - deletedAt, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tpls...) - require.NoError(err) - - // Inject a revision to sweep up the last revision - _, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return nil - }) - require.NoError(err) - - // Ensure the relationships were deleted. - for _, tpl := range tpls { - tRequire.NoTupleExists(ctx, tpl, deletedAt) - } - - // Sleep to ensure GC. - time.Sleep(1 * time.Millisecond) - - // Run GC and ensure all the stale relationships are removed. - afterDelete, err := pds.Now(ctx) - require.NoError(err) - - afterDeleteTx, err := pds.TxIDBefore(ctx, afterDelete) - require.NoError(err) - - removed, err = pds.DeleteBeforeTx(ctx, afterDeleteTx) - require.NoError(err) - require.Equal(int64(chunkRelationshipCount), removed.Relationships) - require.Equal(int64(2), removed.Transactions) - require.Zero(removed.Namespaces) -} - -func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { - testCases := []struct { - testName string - quantization time.Duration - relativeTimes []time.Duration - numLower uint64 - numHigher uint64 - }{ - { - "DefaultRevision", - 1 * time.Second, - []time.Duration{}, - 0, 0, - }, - { - "OnlyPastRevisions", - 1 * time.Second, - []time.Duration{-2 * time.Second}, - 1, 0, - }, - { - "OnlyFutureRevisions", - 1 * time.Second, - []time.Duration{2 * time.Second}, - 0, 1, - }, - { - "QuantizedLower", - 2 * time.Second, - []time.Duration{-4 * time.Second, -1 * time.Nanosecond, 0}, - 1, 2, - }, - { - "QuantizationDisabled", - 1 * time.Nanosecond, - []time.Duration{-2 * time.Second, -1 * time.Nanosecond, 0}, - 3, 0, - }, - } - - for _, tc := range testCases { - t.Run(tc.testName, func(t *testing.T) { - require := require.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - var conn *pgx.Conn - ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { - var err error - conn, err = pgx.Connect(ctx, uri) - RegisterTypes(conn.TypeMap()) - require.NoError(err) - - ds, err := newPostgresDatastore( - uri, - RevisionQuantization(5*time.Second), - GCWindow(24*time.Hour), - WatchBufferLength(1), - ) - require.NoError(err) - - return ds - }) - defer ds.Close() - - // set a random time zone to ensure the queries are unaffected by tz - _, err := conn.Exec(ctx, fmt.Sprintf("SET TIME ZONE -%d", rand.Intn(8)+1)) - require.NoError(err) - - var dbNow time.Time - err = conn.QueryRow(ctx, "SELECT (NOW() AT TIME ZONE 'utc')").Scan(&dbNow) - require.NoError(err) - - if len(tc.relativeTimes) > 0 { - psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) - insertTxn := psql.Insert(tableTransaction).Columns(colTimestamp) - - for _, offset := range tc.relativeTimes { - sql, args, err := insertTxn.Values(dbNow.Add(offset)).ToSql() - require.NoError(err) - - _, err = conn.Exec(ctx, sql, args...) - require.NoError(err) - } - } - - queryRevision := fmt.Sprintf( - querySelectRevision, - colXID, - tableTransaction, - colTimestamp, - tc.quantization.Nanoseconds(), - colSnapshot, - ) - - var revision xid8 - var snapshot pgSnapshot - var validFor time.Duration - err = conn.QueryRow(ctx, queryRevision).Scan(&revision, &snapshot, &validFor) - require.NoError(err) - - queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" - numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") - numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") - - var numLower, numHigher uint64 - require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) - require.NoError(conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot) - - // Subtract one from numLower because of the artificially injected first transaction row - require.Equal(tc.numLower, numLower-1) - require.Equal(tc.numHigher, numHigher) - }) - } -} - -// ConcurrentRevisionHeadTest uses goroutines and channels to intentionally set up a pair of -// revisions that are concurrently applied and then ensures a call to HeadRevision reflects -// the changes found in *both* revisions. -func ConcurrentRevisionHeadTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces(ctx, namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - ), namespace.Namespace("user")) - }) - require.NoError(err) - - g := errgroup.Group{} - - waitToStart := make(chan struct{}) - waitToFinish := make(chan struct{}) - - var commitLastRev, commitFirstRev datastore.Revision - g.Go(func() error { - var err error - commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: "resource", - ObjectId: "123", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) - require.NoError(err) - - close(waitToStart) - <-waitToFinish - - return err - }) - require.NoError(err) - return nil - }) - - <-waitToStart - - commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: "resource", - ObjectId: "789", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) - }) - close(waitToFinish) - - require.NoError(err) - require.NoError(g.Wait()) - - // Ensure the revisions do not compare. - require.False(commitFirstRev.GreaterThan(commitLastRev)) - require.False(commitFirstRev.Equal(commitLastRev)) - - // Ensure a call to HeadRevision now reflects both sets of data applied. - headRev, err := ds.HeadRevision(ctx) - require.NoError(err) - - reader := ds.SnapshotReader(headRev) - it, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ - ResourceType: "resource", - }) - require.NoError(err) - defer it.Close() - - found := []*core.RelationTuple{} - for tpl := it.Next(); tpl != nil; tpl = it.Next() { - require.NoError(it.Err()) - found = append(found, tpl) - } - - require.Equal(2, len(found), "missing relationships in %v", found) -} - -// ConcurrentRevisionWatchTest uses goroutines and channels to intentionally set up a pair of -// revisions that are concurrently applied and then ensures that a Watch call does not end up -// in a loop. -func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - withCancel, cancel := context.WithCancel(ctx) - defer cancel() - - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - - // Write basic namespaces. - rev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces(ctx, namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - ), namespace.Namespace("user")) - }) - require.NoError(err) - - // Start a watch loop. - waitForWatch := make(chan struct{}) - seenWatchRevisions := make([]datastore.Revision, 0) - seenWatchRevisionsLock := sync.Mutex{} - - go func() { - changes, _ := ds.Watch(withCancel, rev) - - waitForWatch <- struct{}{} - - for { - select { - case change, ok := <-changes: - if !ok { - return - } - - seenWatchRevisionsLock.Lock() - seenWatchRevisions = append(seenWatchRevisions, change.Revision) - seenWatchRevisionsLock.Unlock() - - time.Sleep(1 * time.Millisecond) - case <-withCancel.Done(): - return - } - } - }() - - <-waitForWatch - - // Write the two concurrent transactions, while watching for changes. - g := errgroup.Group{} - - waitToStart := make(chan struct{}) - waitToFinish := make(chan struct{}) - - var commitLastRev, commitFirstRev datastore.Revision - g.Go(func() error { - var err error - commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ - tuple.Touch(tuple.MustParse("something:001#viewer@user:123")), - tuple.Touch(tuple.MustParse("something:002#viewer@user:123")), - tuple.Touch(tuple.MustParse("something:003#viewer@user:123")), - }) - require.NoError(err) - - close(waitToStart) - <-waitToFinish - - return err - }) - require.NoError(err) - return nil - }) - - <-waitToStart - - commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{ - tuple.Touch(tuple.MustParse("resource:1001#reader@user:456")), - tuple.Touch(tuple.MustParse("resource:1002#reader@user:456")), - tuple.Touch(tuple.MustParse("resource:1003#reader@user:456")), - }) - }) - close(waitToFinish) - - require.NoError(err) - require.NoError(g.Wait()) - - // Ensure the revisions do not compare. - require.False(commitFirstRev.GreaterThan(commitLastRev)) - require.False(commitLastRev.GreaterThan(commitFirstRev)) - require.False(commitFirstRev.Equal(commitLastRev)) - - // Write another revision. - afterRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: "resource", - ObjectId: "2345", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) - }) - require.NoError(err) - require.True(afterRev.GreaterThan(commitFirstRev)) - require.True(afterRev.GreaterThan(commitLastRev)) - - // Ensure that the last revision is eventually seen from the watch. - require.Eventually(func() bool { - seenWatchRevisionsLock.Lock() - defer seenWatchRevisionsLock.Unlock() - return len(seenWatchRevisions) == 3 && seenWatchRevisions[len(seenWatchRevisions)-1].String() == afterRev.String() - }, 2*time.Second, 5*time.Millisecond) -} - -func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - - rev, err := ds.HeadRevision(ctx) - require.NoError(err) - - pds := ds.(*pgDatastore) - require.True(pds.watchEnabled) - - prev := rev.(postgresRevision) - nexttx := prev.snapshot.xmax + 1 - - // Manually construct an equivalent of overlapping transactions in the database, from the repro - // information (See: https://github.com/authzed/spicedb/issues/1272) - err = pgx.BeginTxFunc(ctx, pds.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { - _, err := tx.Exec(ctx, fmt.Sprintf( - `INSERT INTO %s ("%s", "%s") VALUES ('%d', '%d:%d:')`, - tableTransaction, - colXID, - colSnapshot, - nexttx, - nexttx, - nexttx, - )) - if err != nil { - return err - } - - _, err = tx.Exec(ctx, fmt.Sprintf( - `INSERT INTO %s ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s") VALUES ('somenamespace', '123', 'viewer', 'user', '456', '...', '', null, '%d'::xid8)`, - tableTuple, - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colCreatedXid, - nexttx, - )) - if err != nil { - return err - } - - _, err = tx.Exec(ctx, fmt.Sprintf( - `INSERT INTO %s ("xid", "snapshot") VALUES ('%d', '%d:%d:')`, - tableTransaction, - nexttx+1, - nexttx, - nexttx, - )) - - if err != nil { - return err - } - - _, err = tx.Exec(ctx, fmt.Sprintf( - `INSERT INTO %s ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s") VALUES ('somenamespace', '456', 'viewer', 'user', '456', '...', '', null, '%d'::xid8)`, - tableTuple, - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colCreatedXid, - nexttx+1, - )) - - return err - }) - require.NoError(err) - - // Call watch and ensure it terminates with having only read the two expected sets of changes. - changes, errChan := ds.Watch(ctx, rev) - transactionCount := 0 -loop: - for { - select { - case _, ok := <-changes: - if !ok { - err := <-errChan - require.NoError(err) - return - } - - transactionCount++ - time.Sleep(10 * time.Millisecond) - case <-time.NewTimer(1 * time.Second).C: - break loop - } - } - - require.Equal(2, transactionCount) -} - -// RevisionInversionTest uses goroutines and channels to intentionally set up a pair of -// revisions that might compare incorrectly. -func RevisionInversionTest(t *testing.T, ds datastore.Datastore) { - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - // Write basic namespaces. - _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces(ctx, namespace.Namespace( - "resource", - namespace.MustRelation("reader", nil), - ), namespace.Namespace("user")) - }) - require.NoError(err) - - g := errgroup.Group{} - - waitToStart := make(chan struct{}) - waitToFinish := make(chan struct{}) - - var commitLastRev, commitFirstRev datastore.Revision - g.Go(func() error { - var err error - commitLastRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: "resource", - ObjectId: "123", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - err = rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) - require.NoError(err) - - close(waitToStart) - <-waitToFinish - - return err - }) - require.NoError(err) - return nil - }) - - <-waitToStart - - commitFirstRev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: "resource", - ObjectId: "789", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "ten", - Relation: "...", - }, - }) - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu}) - }) - close(waitToFinish) - - require.NoError(err) - require.NoError(g.Wait()) - require.False(commitFirstRev.GreaterThan(commitLastRev)) - require.False(commitFirstRev.Equal(commitLastRev)) -} - -func OTelTracingTest(t *testing.T, ds datastore.Datastore) { - otelMutex.Lock() - defer otelMutex.Unlock() - - require := require.New(t) - - ctx := context.Background() - r, err := ds.ReadyState(ctx) - require.NoError(err) - require.True(r.IsReady) - - spanrecorder := tracetest.NewSpanRecorder() - testTraceProvider.RegisterSpanProcessor(spanrecorder) - - // Perform basic operation - _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - return rwt.WriteNamespaces(ctx, namespace.Namespace("resource")) - }) - require.NoError(err) - - ended := spanrecorder.Ended() - var present bool - for _, span := range ended { - if span.Name() == "query INSERT" { - present = true - } - } - require.True(present, "missing trace for Streaming gRPC call") -} - -func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgVersion string) { - require := require.New(t) - - ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion, false).NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(uri, - RevisionQuantization(0), - GCWindow(time.Millisecond*1), - WatchBufferLength(1), - ) - require.NoError(err) - return ds - }) - defer ds.Close() - - ds, revision := testfixtures.StandardDatastoreWithData(ds, require) - _, errChan := ds.Watch( - context.Background(), - revision, - ) - err := <-errChan - require.NotNil(err) - require.Contains(err.Error(), "track_commit_timestamp=on") -} - -func BenchmarkPostgresQuery(b *testing.B) { - req := require.New(b) - - ds := testdatastore.RunPostgresForTesting(b, "", migrate.Head, pgversion.MinimumSupportedPostgresVersion, false).NewDatastore(b, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(uri, - RevisionQuantization(0), - GCWindow(time.Millisecond*1), - WatchBufferLength(1), - ) - require.NoError(b, err) - return ds - }) - defer ds.Close() - ds, revision := testfixtures.StandardDatastoreWithData(ds, req) - - b.Run("benchmark checks", func(b *testing.B) { - require := require.New(b) - - for i := 0; i < b.N; i++ { - iter, err := ds.SnapshotReader(revision).QueryRelationships(context.Background(), datastore.RelationshipsFilter{ - ResourceType: testfixtures.DocumentNS.Name, - }) - require.NoError(err) - - defer iter.Close() - - for tpl := iter.Next(); tpl != nil; tpl = iter.Next() { - require.Equal(testfixtures.DocumentNS.Name, tpl.ResourceAndRelation.Namespace) - } - require.NoError(iter.Err()) - } - }) -} - -func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.QueryInterceptor, pgVersion string) datastore.Datastore { - require := require.New(t) - - ds := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", migrate.Head, false, pgVersion, false).NewDatastore(t, func(engine, uri string) datastore.Datastore { - ds, err := newPostgresDatastore(uri, - RevisionQuantization(0), - GCWindow(time.Millisecond*1), - WatchBufferLength(1), - WithQueryInterceptor(interceptor), - ) - require.NoError(err) - return ds - }) - t.Cleanup(func() { - ds.Close() - }) - - ds, _ = testfixtures.StandardDatastoreWithData(ds, require) - - // Write namespaces and a few thousand relationships. - ctx := context.Background() - for i := 0; i < 1000; i++ { - _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - err := rwt.WriteNamespaces(ctx, namespace.Namespace( - fmt.Sprintf("resource%d", i), - namespace.MustRelation("reader", nil))) - if err != nil { - return err - } - - // Write some relationships. - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: testfixtures.DocumentNS.Name, - ObjectId: fmt.Sprintf("doc%d", i), - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - - rtu2 := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: fmt.Sprintf("resource%d", i), - ObjectId: "123", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - - rtu3 := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: fmt.Sprintf("resource%d", i), - ObjectId: "123", - Relation: "writer", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu, rtu2, rtu3}) - }) - require.NoError(err) - } - - // Delete some relationships. - for i := 990; i < 1000; i++ { - _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - rtu := tuple.Delete(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: testfixtures.DocumentNS.Name, - ObjectId: fmt.Sprintf("doc%d", i), - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - - rtu2 := tuple.Delete(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: fmt.Sprintf("resource%d", i), - ObjectId: "123", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu, rtu2}) - }) - require.NoError(err) - } - - // Write some more relationships. - for i := 1000; i < 1100; i++ { - _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - // Write some relationships. - rtu := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: testfixtures.DocumentNS.Name, - ObjectId: fmt.Sprintf("doc%d", i), - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - - rtu2 := tuple.Touch(&core.RelationTuple{ - ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: fmt.Sprintf("resource%d", i), - ObjectId: "123", - Relation: "reader", - }, - Subject: &core.ObjectAndRelation{ - Namespace: "user", - ObjectId: "456", - Relation: "...", - }, - }) - return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{rtu, rtu2}) - }) - require.NoError(err) - } - - return ds -} - -func GCQueriesServedByExpectedIndexes(t *testing.T, _ testdatastore.RunningEngineForTest, pgVersion string) { - require := require.New(t) - interceptor := &withQueryInterceptor{explanations: make(map[string]string, 0)} - ds := datastoreWithInterceptorAndTestData(t, interceptor, pgVersion) - - // Get the head revision. - ctx := context.Background() - revision, err := ds.HeadRevision(ctx) - require.NoError(err) - - for { - wds, ok := ds.(datastore.UnwrappableDatastore) - if !ok { - break - } - ds = wds.Unwrap() - } - - casted, ok := ds.(common.GarbageCollector) - require.True(ok) - - _, err = casted.DeleteBeforeTx(context.Background(), revision) - require.NoError(err) - - require.NotEmpty(interceptor.explanations, "expected queries to be executed") - - // Ensure we have indexes representing each query in the GC workflow. - for _, explanation := range interceptor.explanations { - switch { - case strings.HasPrefix(explanation, "Delete on relation_tuple_transaction"): - fallthrough - - case strings.HasPrefix(explanation, "Delete on namespace_config"): - fallthrough - - case strings.HasPrefix(explanation, "Delete on relation_tuple"): - require.Contains(explanation, "Index Scan") - - default: - require.Failf("unknown GC query: %s", explanation) - } - } + testPostgresDatastoreWithoutCommitTimestamps(t, postgresConfigs) } diff --git a/magefiles/test.go b/magefiles/test.go index 6f8c9e7f52..3f58345c43 100644 --- a/magefiles/test.go +++ b/magefiles/test.go @@ -5,6 +5,7 @@ package main import ( "fmt" "os" + "strings" "github.com/magefile/mage/mg" "github.com/magefile/mage/sh" @@ -72,7 +73,12 @@ func (Testds) Spanner() error { // Postgres Run datastore tests for postgres func (Testds) Postgres() error { - return datastoreTest("postgres") + return datastoreTest("postgres", "postgres") +} + +// Pgbouncer Run datastore tests for postgres with Pgbouncer +func (Testds) Pgbouncer() error { + return datastoreTest("postgres", "pgbouncer") } // Mysql Run datastore tests for mysql @@ -80,9 +86,11 @@ func (Testds) Mysql() error { return datastoreTest("mysql") } -func datastoreTest(datastore string) error { +func datastoreTest(datastore string, tags ...string) error { + mergedTags := append([]string{"ci", "docker"}, tags...) + tagString := strings.Join(mergedTags, ",") mg.Deps(checkDocker) - return goTest(fmt.Sprintf("./internal/datastore/%s/...", datastore), "-tags", "ci,docker", "-timeout", "10m") + return goTest(fmt.Sprintf("./internal/datastore/%s/...", datastore), "-tags", tagString, "-timeout", "10m") } type Testcons mg.Namespace @@ -102,6 +110,13 @@ func (Testcons) Postgres() error { return consistencyTest("postgres") } +// Pgbouncer Run consistency tests for postgres with pgbouncer +// FIXME actually implement this +func (Testcons) Pgbouncer() error { + println("postgres+pgbouncer consistency tests are not implemented") + return nil +} + // Mysql Run consistency tests for mysql func (Testcons) Mysql() error { return consistencyTest("mysql")