diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index 7c37737442..e2352367d0 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb require.NoError(t, err) t.Log("r2 token: ", r2.WrittenAt.Token) - z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) - z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) t.Log("z1 revision: ", z1) t.Log("z2 revision: ", z2) diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index b204fca607..9bae1bdd72 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -15,10 +15,16 @@ import ( type MockDatastore struct { mock.Mock + + CurrentUniqueID string } func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { - return "mockds", nil + if dm.CurrentUniqueID == "" { + return "mockds", nil + } + + return dm.CurrentUniqueID, nil } func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/proxy/relationshipintegrity.go b/internal/datastore/proxy/relationshipintegrity.go index 4b129b14d1..aea0fb34e7 100644 --- a/internal/datastore/proxy/relationshipintegrity.go +++ b/internal/datastore/proxy/relationshipintegrity.go @@ -197,6 +197,10 @@ func (r *relationshipIntegrityProxy) OptimizedRevision(ctx context.Context) (dat return r.ds.OptimizedRevision(ctx) } +func (r *relationshipIntegrityProxy) UniqueID(ctx context.Context) (string, error) { + return r.ds.UniqueID(ctx) +} + func (r *relationshipIntegrityProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) { return r.ds.ReadyState(ctx) } diff --git a/internal/datastore/revisions/commonrevision.go b/internal/datastore/revisions/commonrevision.go index 709272818e..dc1d079924 100644 --- a/internal/datastore/revisions/commonrevision.go +++ b/internal/datastore/revisions/commonrevision.go @@ -1,6 +1,8 @@ package revisions import ( + "context" + "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc { // CommonDecoder is a revision decoder that can decode revisions of a given kind. type CommonDecoder struct { - Kind RevisionKind + Kind RevisionKind + DatastoreUniqueID string +} + +func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) { + return cd.DatastoreUniqueID, nil } func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) { diff --git a/internal/services/integrationtesting/cert_test.go b/internal/services/integrationtesting/cert_test.go index 465122a34e..cb33bb8281 100644 --- a/internal/services/integrationtesting/cert_test.go +++ b/internal/services/integrationtesting/cert_test.go @@ -148,7 +148,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor("testing"), + Middleware: consistency.UnaryServerInterceptor("testing", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -167,7 +167,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor("testing"), + Middleware: consistency.StreamServerInterceptor("testing", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -211,7 +211,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, @@ -264,7 +264,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/integrationtesting/consistencytestutil/servicetester.go b/internal/services/integrationtesting/consistencytestutil/servicetester.go index f4783ae8ea..e3fad25b3f 100644 --- a/internal/services/integrationtesting/consistencytestutil/servicetester.go +++ b/internal/services/integrationtesting/consistencytestutil/servicetester.go @@ -78,7 +78,7 @@ func (v1st v1ServiceTester) Check(ctx context.Context, resource tuple.ObjectAndR }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: context, @@ -98,7 +98,7 @@ func (v1st v1ServiceTester) Expand(ctx context.Context, resource tuple.ObjectAnd Permission: resource.Relation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -134,7 +134,7 @@ func (v1st v1ServiceTester) Read(_ context.Context, namespaceName string, atRevi }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -181,7 +181,7 @@ func (v1st v1ServiceTester) LookupResources(_ context.Context, resourceRelation }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, OptionalLimit: limit, @@ -230,7 +230,7 @@ func (v1st v1ServiceTester) LookupSubjects(_ context.Context, resource tuple.Obj OptionalSubjectRelation: optionalizeRelation(subjectRelation.Relation), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: builtContext, @@ -260,7 +260,7 @@ func (v1st v1ServiceTester) BulkCheck(ctx context.Context, items []*v1.BulkCheck Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -276,7 +276,7 @@ func (v1st v1ServiceTester) CheckBulk(ctx context.Context, items []*v1.CheckBulk Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) diff --git a/internal/services/integrationtesting/perf_test.go b/internal/services/integrationtesting/perf_test.go index e641ab89fc..e9e7377abf 100644 --- a/internal/services/integrationtesting/perf_test.go +++ b/internal/services/integrationtesting/perf_test.go @@ -58,7 +58,7 @@ func TestBurst(t *testing.T) { _, err := client.CheckPermission(context.Background(), &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/v1/debug_test.go b/internal/services/v1/debug_test.go index 3cf6df6581..481f020b98 100644 --- a/internal/services/v1/debug_test.go +++ b/internal/services/v1/debug_test.go @@ -508,7 +508,7 @@ func TestCheckPermissionWithDebug(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: stc.checkRequest.resource, diff --git a/internal/services/v1/experimental.go b/internal/services/v1/experimental.go index de68d3a678..2e34eeb8bf 100644 --- a/internal/services/v1/experimental.go +++ b/internal/services/v1/experimental.go @@ -525,10 +525,16 @@ func (es *experimentalServer) ExperimentalReflectSchema(ctx context.Context, req } } + ds := datastoremw.MustFromContext(ctx) + readAt, err := zedtoken.NewFromRevision(ctx, atRevision, ds) + if err != nil { + return nil, shared.RewriteErrorWithoutConfig(ctx, err) + } + return &v1.ExperimentalReflectSchemaResponse{ Definitions: definitions, Caveats: caveats, - ReadAt: zedtoken.MustNewFromRevision(atRevision), + ReadAt: readAt, }, nil } @@ -543,12 +549,21 @@ func (es *experimentalServer) ExperimentalDiffSchema(ctx context.Context, req *v return nil, shared.RewriteErrorWithoutConfig(ctx, err) } - resp, err := convertDiff(diff, existingSchema, comparisonSchema, atRevision) + ds := datastoremw.MustFromContext(ctx) + diffs, err := convertDiff(diff, existingSchema, comparisonSchema) if err != nil { return nil, shared.RewriteErrorWithoutConfig(ctx, err) } - return resp, nil + readAt, err := zedtoken.NewFromRevision(ctx, atRevision, ds) + if err != nil { + return nil, shared.RewriteErrorWithoutConfig(ctx, err) + } + + return &v1.ExperimentalDiffSchemaResponse{ + Diffs: diffs, + ReadAt: readAt, + }, nil } func (es *experimentalServer) ExperimentalComputablePermissions(ctx context.Context, req *v1.ExperimentalComputablePermissionsRequest) (*v1.ExperimentalComputablePermissionsResponse, error) { @@ -749,11 +764,16 @@ func (es *experimentalServer) ExperimentalCountRelationships(ctx context.Context return nil, spiceerrors.MustBugf("count should not be negative") } + readAt, err := zedtoken.NewFromRevision(ctx, headRev, ds) + if err != nil { + return nil, shared.RewriteErrorWithoutConfig(ctx, err) + } + return &v1.ExperimentalCountRelationshipsResponse{ CounterResult: &v1.ExperimentalCountRelationshipsResponse_ReadCounterValue{ ReadCounterValue: &v1.ReadCounterValue{ RelationshipCount: uintCount, - ReadAt: zedtoken.MustNewFromRevision(headRev), + ReadAt: readAt, }, }, }, nil diff --git a/internal/services/v1/expreflection.go b/internal/services/v1/expreflection.go index 212cfd7b37..740f68841d 100644 --- a/internal/services/v1/expreflection.go +++ b/internal/services/v1/expreflection.go @@ -9,7 +9,6 @@ import ( "github.com/authzed/spicedb/pkg/caveats" caveattypes "github.com/authzed/spicedb/pkg/caveats/types" - "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/diff" caveatdiff "github.com/authzed/spicedb/pkg/diff/caveats" nsdiff "github.com/authzed/spicedb/pkg/diff/namespace" @@ -18,7 +17,6 @@ import ( iv1 "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" - "github.com/authzed/spicedb/pkg/zedtoken" ) type schemaFilters struct { @@ -180,8 +178,7 @@ func convertDiff( diff *diff.SchemaDiff, existingSchema *diff.DiffableSchema, comparisonSchema *diff.DiffableSchema, - atRevision datastore.Revision, -) (*v1.ExperimentalDiffSchemaResponse, error) { +) ([]*v1.ExpSchemaDiff, error) { size := len(diff.AddedNamespaces) + len(diff.RemovedNamespaces) + len(diff.AddedCaveats) + len(diff.RemovedCaveats) + len(diff.ChangedNamespaces) + len(diff.ChangedCaveats) diffs := make([]*v1.ExpSchemaDiff, 0, size) @@ -513,10 +510,7 @@ func convertDiff( } } - return &v1.ExperimentalDiffSchemaResponse{ - Diffs: diffs, - ReadAt: zedtoken.MustNewFromRevision(atRevision), - }, nil + return diffs, nil } // namespaceAPIReprForName builds an API representation of a namespace. diff --git a/internal/services/v1/expreflection_test.go b/internal/services/v1/expreflection_test.go index fdc902dbc1..6943d97ea0 100644 --- a/internal/services/v1/expreflection_test.go +++ b/internal/services/v1/expreflection_test.go @@ -9,7 +9,6 @@ import ( "github.com/ettle/strcase" "github.com/stretchr/testify/require" - "github.com/authzed/spicedb/pkg/datastore/revisionparsing" "github.com/authzed/spicedb/pkg/diff" "github.com/authzed/spicedb/pkg/genutil/mapz" "github.com/authzed/spicedb/pkg/schemadsl/compiler" @@ -544,21 +543,18 @@ func TestConvertDiff(t *testing.T) { diff, err := diff.DiffSchemas(es, cs) require.NoError(t, err) - resp, err := convertDiff( + diffs, err := convertDiff( diff, &es, &cs, - revisionparsing.MustParseRevisionForTest("1"), ) if err != nil { t.Fatalf("unexpected error: %v", err) } - require.NotNil(t, resp.ReadAt) - resp.ReadAt = nil - testutil.RequireProtoEqual(t, tc.expectedResponse, resp, "got mismatch") + testutil.RequireProtoEqual(t, tc.expectedResponse, &v1.ExperimentalDiffSchemaResponse{Diffs: diffs}, "got mismatch") - for _, diff := range resp.Diffs { + for _, diff := range diffs { name := reflect.TypeOf(diff.GetDiff()).String() encounteredDiffTypes.Add(strings.ToLower(strings.Split(name, "_")[1])) } diff --git a/internal/services/v1/metadata_test.go b/internal/services/v1/metadata_test.go index 3acbc131a4..e8eaa8eb3e 100644 --- a/internal/services/v1/metadata_test.go +++ b/internal/services/v1/metadata_test.go @@ -38,7 +38,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -53,7 +53,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckBulkPermissions(ctx, &v1.CheckBulkPermissionsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Items: []*v1.CheckBulkPermissionsRequestItem{ @@ -96,7 +96,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.ExpandPermissionTree(ctx, &v1.ExpandPermissionTreeRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -130,7 +130,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupResources(ctx, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -155,7 +155,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupSubjects(ctx, &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -188,7 +188,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.ExportBulkRelationships(ctx, &v1.ExportBulkRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) diff --git a/internal/services/v1/permissions_test.go b/internal/services/v1/permissions_test.go index ff0bd11f59..e4d35b06e7 100644 --- a/internal/services/v1/permissions_test.go +++ b/internal/services/v1/permissions_test.go @@ -280,7 +280,7 @@ func TestCheckPermissions(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: tc.resource, @@ -335,7 +335,7 @@ func TestCheckPermissionWithWildcardSubject(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -361,7 +361,7 @@ func TestCheckPermissionWithDebugInfo(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -423,7 +423,7 @@ func TestCheckPermissionWithDebugInfoInError(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "doc1"), @@ -658,7 +658,7 @@ func TestLookupResources(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -735,7 +735,7 @@ func TestExpand(t *testing.T) { Permission: tc.startPermission, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -977,7 +977,7 @@ func TestLookupSubjects(t *testing.T) { OptionalSubjectRelation: tc.subjectRelation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -1025,7 +1025,7 @@ func TestCheckWithCaveats(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "caveatedplan"), @@ -1137,7 +1137,7 @@ func TestCheckWithCaveatErrors(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "firstdoc"), @@ -1190,7 +1190,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request := &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1236,7 +1236,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request = &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1309,7 +1309,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1354,7 +1354,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1399,7 +1399,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1473,7 +1473,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1512,7 +1512,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1658,7 +1658,7 @@ func TestLookupResourcesWithCursors(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, OptionalLimit: uintLimit, @@ -1727,7 +1727,7 @@ func TestLookupResourcesDeduplication(t *testing.T) { Subject: sub("user", "tom", ""), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }) diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index c5fb52f354..547401aa5a 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -378,8 +378,13 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ writeUpdateCounter.WithLabelValues(v1.RelationshipUpdate_Operation_name[int32(kind)]).Observe(float64(count)) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.WriteRelationshipsResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } @@ -496,8 +501,13 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del return nil, ps.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.DeleteRelationshipsResponse{ - DeletedAt: zedtoken.MustNewFromRevision(revision), + DeletedAt: zedToken, DeletionProgress: deletionProgress, }, nil } diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 928e60f977..97b5a062cf 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -302,7 +302,7 @@ func TestReadRelationships(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: tc.filter, @@ -1267,7 +1267,7 @@ func TestDeleteRelationships(t *testing.T) { } require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(tc.deleted), readAll(require, client, resp.DeletedAt)) @@ -1364,7 +1364,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { headRev, err := ds.HeadRevision(context.Background()) require.NoError(err) - beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) resp, err := client.DeleteRelationships(context.Background(), &v1.DeleteRelationshipsRequest{ RelationshipFilter: &v1.RelationshipFilter{ @@ -1375,7 +1375,10 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { }) require.NoError(err) - afterDelete := readOfType(require, "document", client, resp.DeletedAt) + headRev, err = ds.HeadRevision(context.Background()) + require.NoError(err) + + afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) require.LessOrEqual(len(beforeDelete)-len(afterDelete), batchSize) if i == 0 { @@ -1386,7 +1389,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(expected), readAll(require, client, resp.DeletedAt)) @@ -1491,7 +1494,7 @@ func TestWriteRelationshipsWithMetadata(t *testing.T) { require.NoError(err) - beforeWriteToken := zedtoken.MustNewFromRevision(beforeWriteRev) + beforeWriteToken := zedtoken.MustNewFromRevisionForTesting(beforeWriteRev) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -1750,7 +1753,7 @@ func TestReadRelationshipsInvalidCursor(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: &v1.RelationshipFilter{ diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index 5f11849042..193a7ac6a9 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -99,9 +99,14 @@ func (ss *schemaServer) ReadSchema(ctx context.Context, _ *v1.ReadSchemaRequest) DispatchCount: dispatchCount, }) + zedToken, err := zedtoken.NewFromRevision(ctx, headRevision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.ReadSchemaResponse{ SchemaText: schemaText, - ReadAt: zedtoken.MustNewFromRevision(headRevision), + ReadAt: zedToken, }, nil } @@ -152,7 +157,12 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque return nil, ss.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.WriteSchemaResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } diff --git a/internal/services/v1/watch.go b/internal/services/v1/watch.go index ab0d839595..02e49c893c 100644 --- a/internal/services/v1/watch.go +++ b/internal/services/v1/watch.go @@ -51,11 +51,15 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS var afterRevision datastore.Revision if req.OptionalStartCursor != nil && req.OptionalStartCursor.Token != "" { - decodedRevision, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) + decodedRevision, tokenStatus, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) if err != nil { return status.Errorf(codes.InvalidArgument, "failed to decode start revision: %s", err) } + if tokenStatus == zedtoken.StatusMismatchedDatastoreID { + return status.Errorf(codes.InvalidArgument, "start revision was generated by a different datastore") + } + afterRevision = decodedRevision } else { var err error @@ -94,6 +98,11 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if ok { filtered := filterUpdates(objectTypes, filters, update.RelationshipChanges) if len(filtered) > 0 { + zedToken, err := zedtoken.NewFromRevision(ctx, update.Revision, ds) + if err != nil { + return err + } + converted, err := tuple.UpdatesToV1RelationshipUpdates(filtered) if err != nil { return status.Errorf(codes.Internal, "failed to convert updates: %s", err) @@ -101,7 +110,7 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if err := stream.Send(&v1.WatchResponse{ Updates: converted, - ChangesThrough: zedtoken.MustNewFromRevision(update.Revision), + ChangesThrough: zedToken, OptionalTransactionMetadata: update.Metadata, }); err != nil { return status.Errorf(codes.Canceled, "watch canceled by user: %s", err) diff --git a/internal/services/v1/watch_test.go b/internal/services/v1/watch_test.go index 7b9ea41651..84ab930ca1 100644 --- a/internal/services/v1/watch_test.go +++ b/internal/services/v1/watch_test.go @@ -210,7 +210,7 @@ func TestWatch(t *testing.T) { t.Cleanup(cleanup) client := v1.NewWatchServiceClient(conn) - cursor := zedtoken.MustNewFromRevision(revision) + cursor := zedtoken.MustNewFromRevisionForTesting(revision) if tc.startCursor != nil { cursor = tc.startCursor } diff --git a/internal/testserver/server.go b/internal/testserver/server.go index 1b8fd6d991..7f629d98b0 100644 --- a/internal/testserver/server.go +++ b/internal/testserver/server.go @@ -105,7 +105,7 @@ func NewTestServerWithConfigAndDatastore(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor("testserver"), + Middleware: consistency.UnaryServerInterceptor("testserver", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -128,7 +128,7 @@ func NewTestServerWithConfigAndDatastore(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor("testserver"), + Middleware: consistency.StreamServerInterceptor("testserver", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index 97d5a613cb..b19e0af1ad 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -112,6 +112,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { apiFlags.Uint32Var(&config.MaxDeleteRelationshipsLimit, "max-delete-relationships-limit", 1000, "maximum number of relationships that can be deleted in a single request") apiFlags.Uint32Var(&config.MaxLookupResourcesLimit, "max-lookup-resources-limit", 1000, "maximum number of resources that can be looked up in a single request") apiFlags.Uint32Var(&config.MaxBulkExportRelationshipsLimit, "max-bulk-export-relationships-limit", 10_000, "maximum number of relationships that can be exported in a single request") + apiFlags.StringVar(&config.MismatchZedTokenBehavior, "mismatch-zed-token-behavior", "full-consistency", "behavior when a mismatched zedtoken is encountered. One of: full-consistency (treat as a full-consistency call), min-latency (treat as a min-latency call), error (return an error). defaults to full-consistency for safety.") datastoreFlags := nfs.FlagSet(BoldBlue("Datastore")) // Flags for the datastore diff --git a/pkg/cmd/server/defaults.go b/pkg/cmd/server/defaults.go index b1aff63b7d..228e22ac79 100644 --- a/pkg/cmd/server/defaults.go +++ b/pkg/cmd/server/defaults.go @@ -181,14 +181,15 @@ const ( //go:generate go run github.com/ecordell/optgen -output zz_generated.middlewareoption.go . MiddlewareOption type MiddlewareOption struct { - Logger zerolog.Logger `debugmap:"hidden"` - AuthFunc grpcauth.AuthFunc `debugmap:"hidden"` - EnableVersionResponse bool `debugmap:"visible"` - DispatcherForMiddleware dispatch.Dispatcher `debugmap:"hidden"` - EnableRequestLog bool `debugmap:"visible"` - EnableResponseLog bool `debugmap:"visible"` - DisableGRPCHistogram bool `debugmap:"visible"` - MiddlewareServiceLabel string `debugmap:"visible"` + Logger zerolog.Logger `debugmap:"hidden"` + AuthFunc grpcauth.AuthFunc `debugmap:"hidden"` + EnableVersionResponse bool `debugmap:"visible"` + DispatcherForMiddleware dispatch.Dispatcher `debugmap:"hidden"` + EnableRequestLog bool `debugmap:"visible"` + EnableResponseLog bool `debugmap:"visible"` + DisableGRPCHistogram bool `debugmap:"visible"` + MiddlewareServiceLabel string `debugmap:"visible"` + MismatchingZedTokenOption consistencymw.MismatchingTokenOption `debugmap:"visible"` unaryDatastoreMiddleware *ReferenceableMiddleware[grpc.UnaryServerInterceptor] `debugmap:"hidden"` streamDatastoreMiddleware *ReferenceableMiddleware[grpc.StreamServerInterceptor] `debugmap:"hidden"` @@ -221,6 +222,7 @@ func (m MiddlewareOption) WithDatastoreMiddleware(middleware Middleware) Middlew EnableResponseLog: m.EnableResponseLog, DisableGRPCHistogram: m.DisableGRPCHistogram, MiddlewareServiceLabel: m.MiddlewareServiceLabel, + MismatchingZedTokenOption: m.MismatchingZedTokenOption, unaryDatastoreMiddleware: &unary, streamDatastoreMiddleware: &stream, } @@ -248,6 +250,7 @@ func (m MiddlewareOption) WithDatastore(ds datastore.Datastore) MiddlewareOption EnableResponseLog: m.EnableResponseLog, DisableGRPCHistogram: m.DisableGRPCHistogram, MiddlewareServiceLabel: m.MiddlewareServiceLabel, + MismatchingZedTokenOption: m.MismatchingZedTokenOption, unaryDatastoreMiddleware: &unary, streamDatastoreMiddleware: &stream, } @@ -350,7 +353,7 @@ func DefaultUnaryMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.UnaryS NewUnaryMiddleware(). WithName(DefaultInternalMiddlewareConsistency). - WithInterceptor(consistencymw.UnaryServerInterceptor(opts.MiddlewareServiceLabel)). + WithInterceptor(consistencymw.UnaryServerInterceptor(opts.MiddlewareServiceLabel, opts.MismatchingZedTokenOption)). Done(), NewUnaryMiddleware(). @@ -428,7 +431,7 @@ func DefaultStreamingMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.St NewStreamMiddleware(). WithName(DefaultInternalMiddlewareConsistency). - WithInterceptor(consistencymw.StreamServerInterceptor(opts.MiddlewareServiceLabel)). + WithInterceptor(consistencymw.StreamServerInterceptor(opts.MiddlewareServiceLabel, opts.MismatchingZedTokenOption)). Done(), NewStreamMiddleware(). diff --git a/pkg/cmd/server/defaults_test.go b/pkg/cmd/server/defaults_test.go index 5b3ea59f60..c83ec5dee2 100644 --- a/pkg/cmd/server/defaults_test.go +++ b/pkg/cmd/server/defaults_test.go @@ -12,6 +12,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/dispatch" "github.com/authzed/spicedb/internal/middleware/pertoken" + "github.com/authzed/spicedb/pkg/middleware/consistency" ) func TestWithDatastore(t *testing.T) { @@ -30,6 +31,7 @@ func TestWithDatastore(t *testing.T) { true, false, "service", + consistency.TreatMismatchingTokensAsError, nil, nil, } @@ -48,6 +50,7 @@ func TestWithDatastore(t *testing.T) { require.Equal(t, opts.EnableResponseLog, withDS.EnableResponseLog) require.Equal(t, opts.DisableGRPCHistogram, withDS.DisableGRPCHistogram) require.Equal(t, opts.MiddlewareServiceLabel, withDS.MiddlewareServiceLabel) + require.Equal(t, opts.MismatchingZedTokenOption, withDS.MismatchingZedTokenOption) _, authError := withDS.AuthFunc(context.Background()) require.Error(t, authError) @@ -70,6 +73,7 @@ func TestWithDatastoreMiddleware(t *testing.T) { true, false, "anotherservice", + consistency.TreatMismatchingTokensAsError, nil, nil, } @@ -87,6 +91,7 @@ func TestWithDatastoreMiddleware(t *testing.T) { require.Equal(t, opts.EnableResponseLog, withDS.EnableResponseLog) require.Equal(t, opts.DisableGRPCHistogram, withDS.DisableGRPCHistogram) require.Equal(t, opts.MiddlewareServiceLabel, withDS.MiddlewareServiceLabel) + require.Equal(t, opts.MismatchingZedTokenOption, withDS.MismatchingZedTokenOption) _, authError := withDS.AuthFunc(context.Background()) require.Error(t, authError) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 3ded6efb7a..5dfa3a6009 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -43,6 +43,7 @@ import ( datastorecfg "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/middleware/consistency" "github.com/authzed/spicedb/pkg/middleware/requestid" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -121,6 +122,7 @@ type Config struct { MaxBulkExportRelationshipsLimit uint32 `debugmap:"visible"` EnableExperimentalLookupResources bool `debugmap:"visible"` EnableExperimentalRelationshipExpiration bool `debugmap:"visible"` + MismatchZedTokenBehavior string `debugmap:"visible"` // Additional Services MetricsAPI util.HTTPServerConfig `debugmap:"visible"` @@ -385,6 +387,24 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { serverName = "spicedb" } + var mismatchZedTokenOption consistency.MismatchingTokenOption + switch c.MismatchZedTokenBehavior { + case "": + fallthrough + + case "full-consistency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsFullConsistency + + case "min-latency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsMinLatency + + case "error": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsError + + default: + return nil, fmt.Errorf("unknown mismatched zedtoken behavior: %s", c.MismatchZedTokenBehavior) + } + opts := MiddlewareOption{ log.Logger, c.GRPCAuthFunc, @@ -394,6 +414,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { c.EnableResponseLogs, c.DisableGRPCLatencyHistogram, serverName, + mismatchZedTokenOption, nil, nil, } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index b36e405da5..c5b6a5fb92 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -11,6 +11,7 @@ import ( "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" + "github.com/authzed/spicedb/pkg/middleware/consistency" "github.com/authzed/spicedb/pkg/testutil" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" @@ -231,7 +232,7 @@ func TestModifyUnaryMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", nil, nil} + opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, nil, nil} opt = opt.WithDatastore(nil) defaultMw, err := DefaultUnaryMiddleware(opt) @@ -259,7 +260,7 @@ func TestModifyStreamingMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", nil, nil} + opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, nil, nil} opt = opt.WithDatastore(nil) defaultMw, err := DefaultStreamingMiddleware(opt) diff --git a/pkg/cmd/server/zz_generated.middlewareoption.go b/pkg/cmd/server/zz_generated.middlewareoption.go index 79ff8c9f56..ed1dd3430c 100644 --- a/pkg/cmd/server/zz_generated.middlewareoption.go +++ b/pkg/cmd/server/zz_generated.middlewareoption.go @@ -3,6 +3,7 @@ package server import ( dispatch "github.com/authzed/spicedb/internal/dispatch" + consistency "github.com/authzed/spicedb/pkg/middleware/consistency" defaults "github.com/creasty/defaults" helpers "github.com/ecordell/optgen/helpers" auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" @@ -41,6 +42,7 @@ func (m *MiddlewareOption) ToOption() MiddlewareOptionOption { to.EnableResponseLog = m.EnableResponseLog to.DisableGRPCHistogram = m.DisableGRPCHistogram to.MiddlewareServiceLabel = m.MiddlewareServiceLabel + to.MismatchingZedTokenOption = m.MismatchingZedTokenOption to.unaryDatastoreMiddleware = m.unaryDatastoreMiddleware to.streamDatastoreMiddleware = m.streamDatastoreMiddleware } @@ -54,6 +56,7 @@ func (m MiddlewareOption) DebugMap() map[string]any { debugMap["EnableResponseLog"] = helpers.DebugValue(m.EnableResponseLog, false) debugMap["DisableGRPCHistogram"] = helpers.DebugValue(m.DisableGRPCHistogram, false) debugMap["MiddlewareServiceLabel"] = helpers.DebugValue(m.MiddlewareServiceLabel, false) + debugMap["MismatchingZedTokenOption"] = helpers.DebugValue(m.MismatchingZedTokenOption, false) return debugMap } @@ -128,3 +131,10 @@ func WithMiddlewareServiceLabel(middlewareServiceLabel string) MiddlewareOptionO m.MiddlewareServiceLabel = middlewareServiceLabel } } + +// WithMismatchingZedTokenOption returns an option that can set MismatchingZedTokenOption on a MiddlewareOption +func WithMismatchingZedTokenOption(mismatchingZedTokenOption consistency.MismatchingTokenOption) MiddlewareOptionOption { + return func(m *MiddlewareOption) { + m.MismatchingZedTokenOption = mismatchingZedTokenOption + } +} diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index 8e2dfaddef..432a3dd1b7 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -89,6 +89,7 @@ func (c *Config) ToOption() ConfigOption { to.MaxBulkExportRelationshipsLimit = c.MaxBulkExportRelationshipsLimit to.EnableExperimentalLookupResources = c.EnableExperimentalLookupResources to.EnableExperimentalRelationshipExpiration = c.EnableExperimentalRelationshipExpiration + to.MismatchZedTokenBehavior = c.MismatchZedTokenBehavior to.MetricsAPI = c.MetricsAPI to.UnaryMiddlewareModification = c.UnaryMiddlewareModification to.StreamingMiddlewareModification = c.StreamingMiddlewareModification @@ -158,6 +159,7 @@ func (c Config) DebugMap() map[string]any { debugMap["MaxBulkExportRelationshipsLimit"] = helpers.DebugValue(c.MaxBulkExportRelationshipsLimit, false) debugMap["EnableExperimentalLookupResources"] = helpers.DebugValue(c.EnableExperimentalLookupResources, false) debugMap["EnableExperimentalRelationshipExpiration"] = helpers.DebugValue(c.EnableExperimentalRelationshipExpiration, false) + debugMap["MismatchZedTokenBehavior"] = helpers.DebugValue(c.MismatchZedTokenBehavior, false) debugMap["MetricsAPI"] = helpers.DebugValue(c.MetricsAPI, false) debugMap["SilentlyDisableTelemetry"] = helpers.DebugValue(c.SilentlyDisableTelemetry, false) debugMap["TelemetryCAOverridePath"] = helpers.DebugValue(c.TelemetryCAOverridePath, false) @@ -570,6 +572,13 @@ func WithEnableExperimentalRelationshipExpiration(enableExperimentalRelationship } } +// WithMismatchZedTokenBehavior returns an option that can set MismatchZedTokenBehavior on a Config +func WithMismatchZedTokenBehavior(mismatchZedTokenBehavior string) ConfigOption { + return func(c *Config) { + c.MismatchZedTokenBehavior = mismatchZedTokenBehavior + } +} + // WithMetricsAPI returns an option that can set MetricsAPI on a Config func WithMetricsAPI(metricsAPI util.HTTPServerConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go index 4233efaa5c..67c3f1ef5c 100644 --- a/pkg/cursor/cursor.go +++ b/pkg/cursor/cursor.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "encoding/base64" "errors" "fmt" @@ -11,6 +12,7 @@ import ( dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" impl "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/zedtoken" ) // Encode converts a decoded cursor to its opaque version. @@ -109,25 +111,39 @@ func DecodeToDispatchCursor(encoded *v1.Cursor, callAndParameterHash string) (*d // DecodeToDispatchRevision decodes an encoded API cursor into an internal dispatch revision. // NOTE: this method does *not* verify the caller's method signature. -func DecodeToDispatchRevision(encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, error) { +func DecodeToDispatchRevision(ctx context.Context, encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, zedtoken.TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return nil, err + return nil, zedtoken.StatusUnknown, err } v1decoded := decoded.GetV1() if v1decoded == nil { - return nil, ErrNilCursor + return nil, zedtoken.StatusUnknown, ErrNilCursor + } + + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, zedtoken.StatusUnknown, fmt.Errorf(errEncodeError, err) } parsed, err := ds.RevisionFromString(v1decoded.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, zedtoken.StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if v1decoded.DatastoreUniqueId == "" { + return parsed, zedtoken.StatusLegacyEmptyDatastoreID, nil + } + + if v1decoded.DatastoreUniqueId != datastoreUniqueID { + return parsed, zedtoken.StatusMismatchedDatastoreID, nil } - return parsed, nil + return parsed, zedtoken.StatusValid, nil } type revisionDecoder interface { + UniqueID(_ context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/cursor/cursor_test.go b/pkg/cursor/cursor_test.go index 488c3d56bf..115756f303 100644 --- a/pkg/cursor/cursor_test.go +++ b/pkg/cursor/cursor_test.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "fmt" "testing" @@ -59,7 +60,7 @@ func TestEncodeDecode(t *testing.T) { require.Equal(tc.sections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(encoded, revisions.CommonDecoder{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -137,7 +138,7 @@ func TestDecode(t *testing.T) { require.NotNil(decoded) require.Equal(testCase.expectedSections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(&v1.Cursor{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), &v1.Cursor{ Token: testCase.token, }, revisions.CommonDecoder{ Kind: revisions.TransactionID, diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index 59738b0551..6a846a1518 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -158,11 +158,11 @@ func (dc *DevContext) RunV1InMemoryService() (*grpc.ClientConn, func(), error) { s := grpc.NewServer( grpc.ChainUnaryInterceptor( datastoremw.UnaryServerInterceptor(dc.Datastore), - consistency.UnaryServerInterceptor("development"), + consistency.UnaryServerInterceptor("development", consistency.TreatMismatchingTokensAsError), ), grpc.ChainStreamInterceptor( datastoremw.StreamServerInterceptor(dc.Datastore), - consistency.StreamServerInterceptor("development"), + consistency.StreamServerInterceptor("development", consistency.TreatMismatchingTokensAsError), ), ) ps := v1svc.NewPermissionsServer(dc.Dispatcher, v1svc.PermissionsServerConfig{ diff --git a/pkg/middleware/consistency/consistency.go b/pkg/middleware/consistency/consistency.go index 8f75a19252..d1da65131a 100644 --- a/pkg/middleware/consistency/consistency.go +++ b/pkg/middleware/consistency/consistency.go @@ -18,6 +18,7 @@ import ( "github.com/authzed/spicedb/internal/services/shared" "github.com/authzed/spicedb/pkg/cursor" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/zedtoken" ) @@ -28,6 +29,27 @@ var ConsistencyCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "Count of the consistencies used per request", }, []string{"method", "source", "service"}) +// MismatchingTokenOption is the option specifying the behavior of the consistency middleware +// when a ZedToken provided references a different datastore instance than the current +// datastore instance. +type MismatchingTokenOption int + +const ( + // TreatMismatchingTokensAsFullConsistency specifies that the middleware should treat + // a ZedToken that references a different datastore instance as a request for full + // consistency. + TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota + + // TreatMismatchingTokensAsMinLatency specifies that the middleware should treat + // a ZedToken that references a different datastore instance as a request for min + // latency. + TreatMismatchingTokensAsMinLatency + + // TreatMismatchingTokensAsError specifies that the middleware should raise an error + // when a ZedToken that references a different datastore instance is provided. + TreatMismatchingTokensAsError +) + type hasConsistency interface{ GetConsistency() *v1.Consistency } type hasOptionalCursor interface{ GetOptionalCursor() *v1.Cursor } @@ -55,7 +77,17 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, handle := c.(*revisionHandle) rev := handle.revision if rev != nil { - return rev, zedtoken.MustNewFromRevision(rev), nil + ds := datastoremw.FromContext(ctx) + if ds == nil { + return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore") + } + + zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds) + if err != nil { + return nil, nil, err + } + + return rev, zedToken, nil } } @@ -64,10 +96,10 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, // AddRevisionToContext adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string) error { +func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string, option MismatchingTokenOption) error { switch req := req.(type) { case hasConsistency: - return addRevisionToContextFromConsistency(ctx, req, ds, serviceLabel) + return addRevisionToContextFromConsistency(ctx, req, ds, serviceLabel, option) default: return nil } @@ -75,7 +107,7 @@ func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Dat // addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, serviceLabel string) error { +func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, serviceLabel string, option MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil @@ -93,7 +125,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency ConsistencyCounter.WithLabelValues("snapshot", "cursor", serviceLabel).Inc() } - requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds) + requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -137,7 +169,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency case consistency.GetAtLeastAsFresh() != nil: // At least as fresh as: Pick one of the datastore's revision and that specified, which // ever is later. - picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds) + picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -159,11 +191,15 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency ConsistencyCounter.WithLabelValues("snapshot", "request", serviceLabel).Inc() } - requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) + requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) if err != nil { return errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + return fmt.Errorf("ZedToken specified references a different datastore instance but at-exact-snapshot was requested") + } + err = ds.CheckRevision(ctx, requestedRev) if err != nil { return rewriteDatastoreError(ctx, err) @@ -187,7 +223,7 @@ var bypassServiceWhitelist = map[string]struct{}{ // UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func UnaryServerInterceptor(serviceLabel string) grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(serviceLabel string, option MismatchingTokenOption) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { @@ -196,7 +232,7 @@ func UnaryServerInterceptor(serviceLabel string) grpc.UnaryServerInterceptor { } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := AddRevisionToContext(newCtx, req, ds, serviceLabel); err != nil { + if err := AddRevisionToContext(newCtx, req, ds, serviceLabel, option); err != nil { return nil, err } @@ -206,14 +242,14 @@ func UnaryServerInterceptor(serviceLabel string) grpc.UnaryServerInterceptor { // StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func StreamServerInterceptor(serviceLabel string) grpc.StreamServerInterceptor { +func StreamServerInterceptor(serviceLabel string, option MismatchingTokenOption) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, AddRevisionToContext} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, option, AddRevisionToContext} return handler(srv, wrapper) } } @@ -222,7 +258,8 @@ type recvWrapper struct { grpc.ServerStream ctx context.Context serviceLabel string - handler func(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string) error + option MismatchingTokenOption + handler func(context.Context, interface{}, datastore.Datastore, string, MismatchingTokenOption) error } func (s *recvWrapper) Context() context.Context { return s.ctx } @@ -232,12 +269,12 @@ func (s *recvWrapper) RecvMsg(m interface{}) error { return err } ds := datastoremw.MustFromContext(s.ctx) - return s.handler(s.ctx, m, ds, s.serviceLabel) + return s.handler(s.ctx, m, ds, s.serviceLabel, s.option) } // pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most // recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise. -func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) { +func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) { // Calculate a revision as we see fit databaseRev, err := ds.OptimizedRevision(ctx) if err != nil { @@ -245,11 +282,35 @@ func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore. } if requested != nil { - requestedRev, err := zedtoken.DecodeRevision(requested, ds) + requestedRev, status, err := zedtoken.DecodeRevision(requested, ds) if err != nil { return datastore.NoRevision, false, errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + switch option { + case TreatMismatchingTokensAsFullConsistency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references a different datastore instance and SpiceDB is configured to treat this as a full consistency request") + headRev, err := ds.HeadRevision(ctx) + if err != nil { + return datastore.NoRevision, false, err + } + + return headRev, false, nil + + case TreatMismatchingTokensAsMinLatency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references a different datastore instance and SpiceDB is configured to treat this as a min latency request") + return databaseRev, false, nil + + case TreatMismatchingTokensAsError: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references a different datastore instance and SpiceDB is configured to raise an error in this scenario") + return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references a different datastore instance and SpiceDB is configured to raise an error in this scenario") + + default: + return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option) + } + } + if databaseRev.GreaterThan(requestedRev) { return databaseRev, false, nil } diff --git a/pkg/middleware/consistency/consistency_test.go b/pkg/middleware/consistency/consistency_test.go index a5f0b4f516..674f0a38ed 100644 --- a/pkg/middleware/consistency/consistency_test.go +++ b/pkg/middleware/consistency/consistency_test.go @@ -10,6 +10,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" "github.com/authzed/spicedb/internal/datastore/revisions" + datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/pkg/cursor" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/zedtoken" @@ -29,7 +30,9 @@ func TestAddRevisionToContextNoneSupplied(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) - err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, "somelabel") + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -46,13 +49,15 @@ func TestAddRevisionToContextMinimizeLatency(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{ MinimizeLatency: true, }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -69,13 +74,15 @@ func TestAddRevisionToContextFullyConsistent(t *testing.T) { ds.On("HeadRevision").Return(head, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_FullyConsistent{ FullyConsistent: true, }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -93,13 +100,15 @@ func TestAddRevisionToContextAtLeastAsFresh(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(exact), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -117,13 +126,15 @@ func TestAddRevisionToContextAtValidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -141,13 +152,15 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", zero.String()).Return(zero, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(zero), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(zero), }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.Error(err) ds.AssertExpectations(t) } @@ -155,7 +168,10 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { func TestAddRevisionToContextNoConsistencyAPI(t *testing.T) { require := require.New(t) + ds := &proxy_test.MockDatastore{} + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) _, _, err := RevisionFromContext(updated) require.Error(err) @@ -174,14 +190,16 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { // revision in context is at `exact` updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, OptionalCursor: cursor, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) // ensure we get back `optimized` from the cursor @@ -191,3 +209,153 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { require.True(optimized.Equal(rev)) ds.AssertExpectations(t) } + +func TestAtExactSnapshotWithMismatchedToken(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore but at-exact-snapshot") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectError(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectMinLatency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsMinLatency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(optimized.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectFullConsistency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("HeadRevision").Return(head, nil).Once() + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsFullConsistency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(head.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAddRevisionToContextAtLeastAsFreshMatchingIDs(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() + + ds.CurrentUniqueID = "foo" + + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsError) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(exact.Equal(rev)) + ds.AssertExpectations(t) +} diff --git a/pkg/middleware/consistency/forcefull.go b/pkg/middleware/consistency/forcefull.go index 0ec88f7a4c..1d196f0a60 100644 --- a/pkg/middleware/consistency/forcefull.go +++ b/pkg/middleware/consistency/forcefull.go @@ -21,7 +21,7 @@ func ForceFullConsistencyUnaryServerInterceptor(serviceLabel string) grpc.UnaryS } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := setFullConsistencyRevisionToContext(newCtx, req, ds, serviceLabel); err != nil { + if err := setFullConsistencyRevisionToContext(newCtx, req, ds, serviceLabel, TreatMismatchingTokensAsFullConsistency); err != nil { return nil, err } @@ -38,12 +38,12 @@ func ForceFullConsistencyStreamServerInterceptor(serviceLabel string) grpc.Strea return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, setFullConsistencyRevisionToContext} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, TreatMismatchingTokensAsFullConsistency, setFullConsistencyRevisionToContext} return handler(srv, wrapper) } } -func setFullConsistencyRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string) error { +func setFullConsistencyRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string, _ MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil diff --git a/pkg/proto/impl/v1/impl.pb.go b/pkg/proto/impl/v1/impl.pb.go index 67ee033f1d..6c7a448b5d 100644 --- a/pkg/proto/impl/v1/impl.pb.go +++ b/pkg/proto/impl/v1/impl.pb.go @@ -402,6 +402,9 @@ type V1Cursor struct { DispatchVersion uint32 `protobuf:"varint,4,opt,name=dispatch_version,json=dispatchVersion,proto3" json:"dispatch_version,omitempty"` // flags are flags set by the API caller. Flags map[string]string `protobuf:"bytes,5,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + DatastoreUniqueId string `protobuf:"bytes,6,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *V1Cursor) Reset() { @@ -471,6 +474,13 @@ func (x *V1Cursor) GetFlags() map[string]string { return nil } +func (x *V1Cursor) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + type DocComment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -814,6 +824,9 @@ type DecodedZedToken_V1ZedToken struct { unknownFields protoimpl.UnknownFields Revision string `protobuf:"bytes,1,opt,name=revision,proto3" json:"revision,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + DatastoreUniqueId string `protobuf:"bytes,2,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *DecodedZedToken_V1ZedToken) Reset() { @@ -855,6 +868,13 @@ func (x *DecodedZedToken_V1ZedToken) GetRevision() string { return "" } +func (x *DecodedZedToken_V1ZedToken) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + var File_impl_v1_impl_proto protoreflect.FileDescriptor var file_impl_v1_impl_proto_rawDesc = []byte{ @@ -884,7 +904,7 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x08, 0x56, 0x32, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x82, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, + 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0xb2, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x55, 0x0a, 0x14, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x76, 0x31, 0x5f, 0x7a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, @@ -897,15 +917,18 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x65, 0x6e, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x1a, 0x26, 0x0a, 0x08, 0x56, 0x31, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, - 0x1a, 0x28, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, + 0x1a, 0x58, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, + 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x45, 0x0a, 0x0d, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x23, 0x0a, 0x02, 0x76, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, - 0x6f, 0x66, 0x22, 0x94, 0x02, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, + 0x6f, 0x66, 0x22, 0xc4, 0x02, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x73, @@ -918,7 +941,10 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x2e, 0x46, 0x6c, - 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x1a, + 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x12, + 0x2e, 0x0a, 0x13, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, + 0x71, 0x75, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x1a, 0x38, 0x0a, 0x0a, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, diff --git a/pkg/proto/impl/v1/impl.pb.validate.go b/pkg/proto/impl/v1/impl.pb.validate.go index dea8479bb2..a52eece1a5 100644 --- a/pkg/proto/impl/v1/impl.pb.validate.go +++ b/pkg/proto/impl/v1/impl.pb.validate.go @@ -735,6 +735,8 @@ func (m *V1Cursor) validate(all bool) error { // no validation rules for Flags + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return V1CursorMultiError(errors) } @@ -1591,6 +1593,8 @@ func (m *DecodedZedToken_V1ZedToken) validate(all bool) error { // no validation rules for Revision + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return DecodedZedToken_V1ZedTokenMultiError(errors) } diff --git a/pkg/proto/impl/v1/impl_vtproto.pb.go b/pkg/proto/impl/v1/impl_vtproto.pb.go index 95e368513f..074fd1534c 100644 --- a/pkg/proto/impl/v1/impl_vtproto.pb.go +++ b/pkg/proto/impl/v1/impl_vtproto.pb.go @@ -154,6 +154,7 @@ func (m *DecodedZedToken_V1ZedToken) CloneVT() *DecodedZedToken_V1ZedToken { } r := new(DecodedZedToken_V1ZedToken) r.Revision = m.Revision + r.DatastoreUniqueId = m.DatastoreUniqueId if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -242,6 +243,7 @@ func (m *V1Cursor) CloneVT() *V1Cursor { r.Revision = m.Revision r.CallAndParametersHash = m.CallAndParametersHash r.DispatchVersion = m.DispatchVersion + r.DatastoreUniqueId = m.DatastoreUniqueId if rhs := m.Sections; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -549,6 +551,9 @@ func (this *DecodedZedToken_V1ZedToken) EqualVT(that *DecodedZedToken_V1ZedToken if this.Revision != that.Revision { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -726,6 +731,9 @@ func (this *V1Cursor) EqualVT(that *V1Cursor) bool { return false } } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1152,6 +1160,13 @@ func (m *DecodedZedToken_V1ZedToken) MarshalToSizedBufferVT(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x12 + } if len(m.Revision) > 0 { i -= len(m.Revision) copy(dAtA[i:], m.Revision) @@ -1345,6 +1360,13 @@ func (m *V1Cursor) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x32 + } if len(m.Flags) > 0 { for k := range m.Flags { v := m.Flags[k] @@ -1696,6 +1718,10 @@ func (m *DecodedZedToken_V1ZedToken) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1799,6 +1825,10 @@ func (m *V1Cursor) SizeVT() (n int) { n += mapEntrySize + 1 + protohelpers.SizeOfVarint(uint64(mapEntrySize)) } } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -2440,6 +2470,38 @@ func (m *DecodedZedToken_V1ZedToken) UnmarshalVT(dAtA []byte) error { } m.Revision = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2958,6 +3020,38 @@ func (m *V1Cursor) UnmarshalVT(dAtA []byte) error { } m.Flags[mapkey] = mapvalue iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/zedtoken/zedtoken.go b/pkg/zedtoken/zedtoken.go index 797b3e184b..f655b74565 100644 --- a/pkg/zedtoken/zedtoken.go +++ b/pkg/zedtoken/zedtoken.go @@ -2,6 +2,7 @@ package zedtoken import ( + "context" "encoding/base64" "errors" "fmt" @@ -22,9 +23,32 @@ const ( // zedtoken argument to Decode var ErrNilZedToken = errors.New("zedtoken pointer was nil") -// MustNewFromRevision generates an encoded zedtoken from an integral revision. -func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { - encoded, err := NewFromRevision(revision) +// legacyEmptyDatastoreID is the empty datastore ID for legacy tokens and cursors. +const legacyEmptyDatastoreID = "" + +// TokenStatus is the status of a zedtoken. +type TokenStatus int + +const ( + // StatusUnknown indicates that the status of the zedtoken is unknown. + StatusUnknown TokenStatus = iota + + // StatusLegacyEmptyDatastoreID indicates that the zedtoken is a legacy token + // with an empty datastore ID. + StatusLegacyEmptyDatastoreID + + // StatusValid indicates that the zedtoken is valid. + StatusValid + + // StatusMismatchedDatastoreID indicates that the zedtoken is valid, but the + // datastore ID does not match the current datastore, indicating that the + // token was generated by a different datastore. + StatusMismatchedDatastoreID +) + +// MustNewFromRevisionForTesting generates an encoded zedtoken from an integral revision. +func MustNewFromRevisionForTesting(revision datastore.Revision) *v1.ZedToken { + encoded, err := newFromRevision(revision, legacyEmptyDatastoreID) if err != nil { panic(err) } @@ -32,11 +56,21 @@ func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { } // NewFromRevision generates an encoded zedtoken from an integral revision. -func NewFromRevision(revision datastore.Revision) (*v1.ZedToken, error) { +func NewFromRevision(ctx context.Context, revision datastore.Revision, ds datastore.Datastore) (*v1.ZedToken, error) { + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, fmt.Errorf(errEncodeError, err) + } + + return newFromRevision(revision, datastoreUniqueID) +} + +func newFromRevision(revision datastore.Revision, datastoreUniqueID string) (*v1.ZedToken, error) { toEncode := &zedtoken.DecodedZedToken{ VersionOneof: &zedtoken.DecodedZedToken_V1{ V1: &zedtoken.DecodedZedToken_V1ZedToken{ - Revision: revision.String(), + Revision: revision.String(), + DatastoreUniqueId: datastoreUniqueID, }, }, } @@ -77,10 +111,10 @@ func Decode(encoded *v1.ZedToken) (*zedtoken.DecodedZedToken, error) { } // DecodeRevision converts and extracts the revision from a zedtoken or legacy zookie. -func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, error) { +func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return datastore.NoRevision, err + return datastore.NoRevision, StatusUnknown, err } switch ver := decoded.VersionOneof.(type) { @@ -88,21 +122,36 @@ func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revisio revString := fmt.Sprintf("%d", ver.DeprecatedV1Zookie.Revision) parsed, err := ds.RevisionFromString(revString) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + return parsed, StatusLegacyEmptyDatastoreID, nil case *zedtoken.DecodedZedToken_V1: parsed, err := ds.RevisionFromString(ver.V1.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + + if ver.V1.DatastoreUniqueId == legacyEmptyDatastoreID { + return parsed, StatusLegacyEmptyDatastoreID, nil + } + + datastoreUniqueID, err := ds.UniqueID(context.Background()) + if err != nil { + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if ver.V1.DatastoreUniqueId != datastoreUniqueID { + return parsed, StatusMismatchedDatastoreID, nil + } + + return parsed, StatusValid, nil default: - return datastore.NoRevision, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) } } type revisionDecoder interface { + UniqueID(context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/zedtoken/zedtoken_test.go b/pkg/zedtoken/zedtoken_test.go index 09bbc4d48e..85df77aa4d 100644 --- a/pkg/zedtoken/zedtoken_test.go +++ b/pkg/zedtoken/zedtoken_test.go @@ -41,10 +41,9 @@ func TestZedTokenEncode(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -58,10 +57,9 @@ func TestZedTokenEncodeHLC(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.HybridLogicalClock, }) require.NoError(err) @@ -71,65 +69,92 @@ func TestZedTokenEncodeHLC(t *testing.T) { } var decodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { format: "invalid", token: "abc", expectedRevision: datastore.NoRevision, + expectedStatus: StatusUnknown, expectError: true, }, { format: "V1 Zookie", token: "CAESAA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggB", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggC", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAwiAAg==", expectedRevision: revisions.NewForTransactionID(256), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAIaAwoBMA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMQ==", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMg==", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBNA==", expectedRevision: revisions.NewForTransactionID(4), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, + { + format: "V1 ZedToken with matching datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "someuniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusValid, + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "anotheruniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusMismatchedDatastoreID, + expectError: false, + }, } func TestDecode(t *testing.T) { @@ -139,15 +164,17 @@ func TestDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.TransactionID, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.TransactionID, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", @@ -160,14 +187,17 @@ func TestDecode(t *testing.T) { } var hlcDecodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { - format: "V1 ZedToken", - token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + format: "V1 ZedToken", + token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + expectedStatus: StatusLegacyEmptyDatastoreID, expectedRevision: func() datastore.Revision { r, err := revisions.NewForHLC(decimal.NewFromInt(1621538189028928000)) if err != nil { @@ -175,11 +205,47 @@ var hlcDecodeTests = []struct { } return r }(), + }, + { + format: "V1 ZedToken", + token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + expectedStatus: StatusLegacyEmptyDatastoreID, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), expectError: false, }, { - format: "V1 ZedToken", - token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + format: "V1 ZedToken with matching datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusValid, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "arrrg-6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusMismatchedDatastoreID, expectedRevision: (func() datastore.Revision { v, err := decimal.NewFromString("1693540940373045727.0000000001") if err != nil { @@ -204,15 +270,17 @@ func TestHLCDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.HybridLogicalClock, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.HybridLogicalClock, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", diff --git a/proto/internal/impl/v1/impl.proto b/proto/internal/impl/v1/impl.proto index 836d833910..446f1bc84b 100644 --- a/proto/internal/impl/v1/impl.proto +++ b/proto/internal/impl/v1/impl.proto @@ -33,6 +33,10 @@ message DecodedZedToken { } message V1ZedToken { string revision = 1; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + string datastore_unique_id = 2; } oneof version_oneof { V1Zookie deprecated_v1_zookie = 2; @@ -63,6 +67,10 @@ message V1Cursor { // flags are flags set by the API caller. map flags = 5; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + string datastore_unique_id = 6; } message DocComment {