From 0dc015bfbe9324e229f6970bb67ae83f322c2237 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 29 Jan 2024 15:58:39 -0500 Subject: [PATCH] Setup defined (and configurable) behavior if a ZedToken from an older datastore is used All ZedTokens are now minted with the datastore's unique ID included in the ZedToken and that ID is checked when the ZedToken is decoded. In scenarios where the datastore ID does not match, either an error is raised (watch, at_exact_snapshot) or configurable behavior is used (at_least_as_fresh) Fixes #1541 --- e2e/newenemy/newenemy_test.go | 4 +- internal/datastore/proxy/proxy_test/mock.go | 8 +- .../datastore/proxy/relationshipintegrity.go | 4 + .../datastore/revisions/commonrevision.go | 9 +- .../services/integrationtesting/cert_test.go | 8 +- .../consistencytestutil/servicetester.go | 14 +- .../services/integrationtesting/perf_test.go | 2 +- internal/services/v1/debug_test.go | 2 +- internal/services/v1/experimental.go | 28 ++- internal/services/v1/expreflection.go | 10 +- internal/services/v1/expreflection_test.go | 10 +- internal/services/v1/metadata_test.go | 12 +- internal/services/v1/permissions_test.go | 36 ++-- internal/services/v1/relationships.go | 14 +- internal/services/v1/relationships_test.go | 17 +- internal/services/v1/schema.go | 14 +- internal/services/v1/watch.go | 13 +- internal/services/v1/watch_test.go | 2 +- internal/testserver/server.go | 4 +- pkg/cmd/serve.go | 1 + pkg/cmd/server/defaults.go | 23 ++- pkg/cmd/server/defaults_test.go | 5 + pkg/cmd/server/server.go | 21 ++ pkg/cmd/server/server_test.go | 5 +- .../server/zz_generated.middlewareoption.go | 10 + pkg/cmd/server/zz_generated.options.go | 9 + pkg/cursor/cursor.go | 26 ++- pkg/cursor/cursor_test.go | 5 +- pkg/development/devcontext.go | 4 +- pkg/middleware/consistency/consistency.go | 91 +++++++-- .../consistency/consistency_test.go | 190 +++++++++++++++++- pkg/middleware/consistency/forcefull.go | 6 +- pkg/proto/impl/v1/impl.pb.go | 36 +++- pkg/proto/impl/v1/impl.pb.validate.go | 4 + pkg/proto/impl/v1/impl_vtproto.pb.go | 94 +++++++++ pkg/zedtoken/zedtoken.go | 73 +++++-- pkg/zedtoken/zedtoken_test.go | 112 +++++++++-- proto/internal/impl/v1/impl.proto | 8 + 38 files changed, 769 insertions(+), 165 deletions(-) diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index 7c37737442..e2352367d0 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb require.NoError(t, err) t.Log("r2 token: ", r2.WrittenAt.Token) - z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) - z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) t.Log("z1 revision: ", z1) t.Log("z2 revision: ", z2) diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index b204fca607..9bae1bdd72 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -15,10 +15,16 @@ import ( type MockDatastore struct { mock.Mock + + CurrentUniqueID string } func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { - return "mockds", nil + if dm.CurrentUniqueID == "" { + return "mockds", nil + } + + return dm.CurrentUniqueID, nil } func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/proxy/relationshipintegrity.go b/internal/datastore/proxy/relationshipintegrity.go index 4b129b14d1..aea0fb34e7 100644 --- a/internal/datastore/proxy/relationshipintegrity.go +++ b/internal/datastore/proxy/relationshipintegrity.go @@ -197,6 +197,10 @@ func (r *relationshipIntegrityProxy) OptimizedRevision(ctx context.Context) (dat return r.ds.OptimizedRevision(ctx) } +func (r *relationshipIntegrityProxy) UniqueID(ctx context.Context) (string, error) { + return r.ds.UniqueID(ctx) +} + func (r *relationshipIntegrityProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) { return r.ds.ReadyState(ctx) } diff --git a/internal/datastore/revisions/commonrevision.go b/internal/datastore/revisions/commonrevision.go index 709272818e..dc1d079924 100644 --- a/internal/datastore/revisions/commonrevision.go +++ b/internal/datastore/revisions/commonrevision.go @@ -1,6 +1,8 @@ package revisions import ( + "context" + "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc { // CommonDecoder is a revision decoder that can decode revisions of a given kind. type CommonDecoder struct { - Kind RevisionKind + Kind RevisionKind + DatastoreUniqueID string +} + +func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) { + return cd.DatastoreUniqueID, nil } func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) { diff --git a/internal/services/integrationtesting/cert_test.go b/internal/services/integrationtesting/cert_test.go index 465122a34e..cb33bb8281 100644 --- a/internal/services/integrationtesting/cert_test.go +++ b/internal/services/integrationtesting/cert_test.go @@ -148,7 +148,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor("testing"), + Middleware: consistency.UnaryServerInterceptor("testing", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -167,7 +167,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor("testing"), + Middleware: consistency.StreamServerInterceptor("testing", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -211,7 +211,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, @@ -264,7 +264,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/integrationtesting/consistencytestutil/servicetester.go b/internal/services/integrationtesting/consistencytestutil/servicetester.go index f4783ae8ea..e3fad25b3f 100644 --- a/internal/services/integrationtesting/consistencytestutil/servicetester.go +++ b/internal/services/integrationtesting/consistencytestutil/servicetester.go @@ -78,7 +78,7 @@ func (v1st v1ServiceTester) Check(ctx context.Context, resource tuple.ObjectAndR }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: context, @@ -98,7 +98,7 @@ func (v1st v1ServiceTester) Expand(ctx context.Context, resource tuple.ObjectAnd Permission: resource.Relation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -134,7 +134,7 @@ func (v1st v1ServiceTester) Read(_ context.Context, namespaceName string, atRevi }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -181,7 +181,7 @@ func (v1st v1ServiceTester) LookupResources(_ context.Context, resourceRelation }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, OptionalLimit: limit, @@ -230,7 +230,7 @@ func (v1st v1ServiceTester) LookupSubjects(_ context.Context, resource tuple.Obj OptionalSubjectRelation: optionalizeRelation(subjectRelation.Relation), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: builtContext, @@ -260,7 +260,7 @@ func (v1st v1ServiceTester) BulkCheck(ctx context.Context, items []*v1.BulkCheck Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -276,7 +276,7 @@ func (v1st v1ServiceTester) CheckBulk(ctx context.Context, items []*v1.CheckBulk Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) diff --git a/internal/services/integrationtesting/perf_test.go b/internal/services/integrationtesting/perf_test.go index e641ab89fc..e9e7377abf 100644 --- a/internal/services/integrationtesting/perf_test.go +++ b/internal/services/integrationtesting/perf_test.go @@ -58,7 +58,7 @@ func TestBurst(t *testing.T) { _, err := client.CheckPermission(context.Background(), &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/v1/debug_test.go b/internal/services/v1/debug_test.go index 3cf6df6581..481f020b98 100644 --- a/internal/services/v1/debug_test.go +++ b/internal/services/v1/debug_test.go @@ -508,7 +508,7 @@ func TestCheckPermissionWithDebug(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: stc.checkRequest.resource, diff --git a/internal/services/v1/experimental.go b/internal/services/v1/experimental.go index de68d3a678..2e34eeb8bf 100644 --- a/internal/services/v1/experimental.go +++ b/internal/services/v1/experimental.go @@ -525,10 +525,16 @@ func (es *experimentalServer) ExperimentalReflectSchema(ctx context.Context, req } } + ds := datastoremw.MustFromContext(ctx) + readAt, err := zedtoken.NewFromRevision(ctx, atRevision, ds) + if err != nil { + return nil, shared.RewriteErrorWithoutConfig(ctx, err) + } + return &v1.ExperimentalReflectSchemaResponse{ Definitions: definitions, Caveats: caveats, - ReadAt: zedtoken.MustNewFromRevision(atRevision), + ReadAt: readAt, }, nil } @@ -543,12 +549,21 @@ func (es *experimentalServer) ExperimentalDiffSchema(ctx context.Context, req *v return nil, shared.RewriteErrorWithoutConfig(ctx, err) } - resp, err := convertDiff(diff, existingSchema, comparisonSchema, atRevision) + ds := datastoremw.MustFromContext(ctx) + diffs, err := convertDiff(diff, existingSchema, comparisonSchema) if err != nil { return nil, shared.RewriteErrorWithoutConfig(ctx, err) } - return resp, nil + readAt, err := zedtoken.NewFromRevision(ctx, atRevision, ds) + if err != nil { + return nil, shared.RewriteErrorWithoutConfig(ctx, err) + } + + return &v1.ExperimentalDiffSchemaResponse{ + Diffs: diffs, + ReadAt: readAt, + }, nil } func (es *experimentalServer) ExperimentalComputablePermissions(ctx context.Context, req *v1.ExperimentalComputablePermissionsRequest) (*v1.ExperimentalComputablePermissionsResponse, error) { @@ -749,11 +764,16 @@ func (es *experimentalServer) ExperimentalCountRelationships(ctx context.Context return nil, spiceerrors.MustBugf("count should not be negative") } + readAt, err := zedtoken.NewFromRevision(ctx, headRev, ds) + if err != nil { + return nil, shared.RewriteErrorWithoutConfig(ctx, err) + } + return &v1.ExperimentalCountRelationshipsResponse{ CounterResult: &v1.ExperimentalCountRelationshipsResponse_ReadCounterValue{ ReadCounterValue: &v1.ReadCounterValue{ RelationshipCount: uintCount, - ReadAt: zedtoken.MustNewFromRevision(headRev), + ReadAt: readAt, }, }, }, nil diff --git a/internal/services/v1/expreflection.go b/internal/services/v1/expreflection.go index 212cfd7b37..740f68841d 100644 --- a/internal/services/v1/expreflection.go +++ b/internal/services/v1/expreflection.go @@ -9,7 +9,6 @@ import ( "github.com/authzed/spicedb/pkg/caveats" caveattypes "github.com/authzed/spicedb/pkg/caveats/types" - "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/diff" caveatdiff "github.com/authzed/spicedb/pkg/diff/caveats" nsdiff "github.com/authzed/spicedb/pkg/diff/namespace" @@ -18,7 +17,6 @@ import ( iv1 "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" - "github.com/authzed/spicedb/pkg/zedtoken" ) type schemaFilters struct { @@ -180,8 +178,7 @@ func convertDiff( diff *diff.SchemaDiff, existingSchema *diff.DiffableSchema, comparisonSchema *diff.DiffableSchema, - atRevision datastore.Revision, -) (*v1.ExperimentalDiffSchemaResponse, error) { +) ([]*v1.ExpSchemaDiff, error) { size := len(diff.AddedNamespaces) + len(diff.RemovedNamespaces) + len(diff.AddedCaveats) + len(diff.RemovedCaveats) + len(diff.ChangedNamespaces) + len(diff.ChangedCaveats) diffs := make([]*v1.ExpSchemaDiff, 0, size) @@ -513,10 +510,7 @@ func convertDiff( } } - return &v1.ExperimentalDiffSchemaResponse{ - Diffs: diffs, - ReadAt: zedtoken.MustNewFromRevision(atRevision), - }, nil + return diffs, nil } // namespaceAPIReprForName builds an API representation of a namespace. diff --git a/internal/services/v1/expreflection_test.go b/internal/services/v1/expreflection_test.go index fdc902dbc1..6943d97ea0 100644 --- a/internal/services/v1/expreflection_test.go +++ b/internal/services/v1/expreflection_test.go @@ -9,7 +9,6 @@ import ( "github.com/ettle/strcase" "github.com/stretchr/testify/require" - "github.com/authzed/spicedb/pkg/datastore/revisionparsing" "github.com/authzed/spicedb/pkg/diff" "github.com/authzed/spicedb/pkg/genutil/mapz" "github.com/authzed/spicedb/pkg/schemadsl/compiler" @@ -544,21 +543,18 @@ func TestConvertDiff(t *testing.T) { diff, err := diff.DiffSchemas(es, cs) require.NoError(t, err) - resp, err := convertDiff( + diffs, err := convertDiff( diff, &es, &cs, - revisionparsing.MustParseRevisionForTest("1"), ) if err != nil { t.Fatalf("unexpected error: %v", err) } - require.NotNil(t, resp.ReadAt) - resp.ReadAt = nil - testutil.RequireProtoEqual(t, tc.expectedResponse, resp, "got mismatch") + testutil.RequireProtoEqual(t, tc.expectedResponse, &v1.ExperimentalDiffSchemaResponse{Diffs: diffs}, "got mismatch") - for _, diff := range resp.Diffs { + for _, diff := range diffs { name := reflect.TypeOf(diff.GetDiff()).String() encounteredDiffTypes.Add(strings.ToLower(strings.Split(name, "_")[1])) } diff --git a/internal/services/v1/metadata_test.go b/internal/services/v1/metadata_test.go index 3acbc131a4..e8eaa8eb3e 100644 --- a/internal/services/v1/metadata_test.go +++ b/internal/services/v1/metadata_test.go @@ -38,7 +38,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -53,7 +53,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckBulkPermissions(ctx, &v1.CheckBulkPermissionsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Items: []*v1.CheckBulkPermissionsRequestItem{ @@ -96,7 +96,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.ExpandPermissionTree(ctx, &v1.ExpandPermissionTreeRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -130,7 +130,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupResources(ctx, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -155,7 +155,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupSubjects(ctx, &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -188,7 +188,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.ExportBulkRelationships(ctx, &v1.ExportBulkRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) diff --git a/internal/services/v1/permissions_test.go b/internal/services/v1/permissions_test.go index ff0bd11f59..e4d35b06e7 100644 --- a/internal/services/v1/permissions_test.go +++ b/internal/services/v1/permissions_test.go @@ -280,7 +280,7 @@ func TestCheckPermissions(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: tc.resource, @@ -335,7 +335,7 @@ func TestCheckPermissionWithWildcardSubject(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -361,7 +361,7 @@ func TestCheckPermissionWithDebugInfo(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -423,7 +423,7 @@ func TestCheckPermissionWithDebugInfoInError(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "doc1"), @@ -658,7 +658,7 @@ func TestLookupResources(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -735,7 +735,7 @@ func TestExpand(t *testing.T) { Permission: tc.startPermission, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -977,7 +977,7 @@ func TestLookupSubjects(t *testing.T) { OptionalSubjectRelation: tc.subjectRelation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -1025,7 +1025,7 @@ func TestCheckWithCaveats(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "caveatedplan"), @@ -1137,7 +1137,7 @@ func TestCheckWithCaveatErrors(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "firstdoc"), @@ -1190,7 +1190,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request := &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1236,7 +1236,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request = &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1309,7 +1309,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1354,7 +1354,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1399,7 +1399,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1473,7 +1473,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1512,7 +1512,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1658,7 +1658,7 @@ func TestLookupResourcesWithCursors(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, OptionalLimit: uintLimit, @@ -1727,7 +1727,7 @@ func TestLookupResourcesDeduplication(t *testing.T) { Subject: sub("user", "tom", ""), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }) diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index c5fb52f354..547401aa5a 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -378,8 +378,13 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ writeUpdateCounter.WithLabelValues(v1.RelationshipUpdate_Operation_name[int32(kind)]).Observe(float64(count)) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.WriteRelationshipsResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } @@ -496,8 +501,13 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del return nil, ps.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.DeleteRelationshipsResponse{ - DeletedAt: zedtoken.MustNewFromRevision(revision), + DeletedAt: zedToken, DeletionProgress: deletionProgress, }, nil } diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 928e60f977..97b5a062cf 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -302,7 +302,7 @@ func TestReadRelationships(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: tc.filter, @@ -1267,7 +1267,7 @@ func TestDeleteRelationships(t *testing.T) { } require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(tc.deleted), readAll(require, client, resp.DeletedAt)) @@ -1364,7 +1364,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { headRev, err := ds.HeadRevision(context.Background()) require.NoError(err) - beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) resp, err := client.DeleteRelationships(context.Background(), &v1.DeleteRelationshipsRequest{ RelationshipFilter: &v1.RelationshipFilter{ @@ -1375,7 +1375,10 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { }) require.NoError(err) - afterDelete := readOfType(require, "document", client, resp.DeletedAt) + headRev, err = ds.HeadRevision(context.Background()) + require.NoError(err) + + afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) require.LessOrEqual(len(beforeDelete)-len(afterDelete), batchSize) if i == 0 { @@ -1386,7 +1389,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(expected), readAll(require, client, resp.DeletedAt)) @@ -1491,7 +1494,7 @@ func TestWriteRelationshipsWithMetadata(t *testing.T) { require.NoError(err) - beforeWriteToken := zedtoken.MustNewFromRevision(beforeWriteRev) + beforeWriteToken := zedtoken.MustNewFromRevisionForTesting(beforeWriteRev) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -1750,7 +1753,7 @@ func TestReadRelationshipsInvalidCursor(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: &v1.RelationshipFilter{ diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index 5f11849042..193a7ac6a9 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -99,9 +99,14 @@ func (ss *schemaServer) ReadSchema(ctx context.Context, _ *v1.ReadSchemaRequest) DispatchCount: dispatchCount, }) + zedToken, err := zedtoken.NewFromRevision(ctx, headRevision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.ReadSchemaResponse{ SchemaText: schemaText, - ReadAt: zedtoken.MustNewFromRevision(headRevision), + ReadAt: zedToken, }, nil } @@ -152,7 +157,12 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque return nil, ss.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.WriteSchemaResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } diff --git a/internal/services/v1/watch.go b/internal/services/v1/watch.go index ab0d839595..02e49c893c 100644 --- a/internal/services/v1/watch.go +++ b/internal/services/v1/watch.go @@ -51,11 +51,15 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS var afterRevision datastore.Revision if req.OptionalStartCursor != nil && req.OptionalStartCursor.Token != "" { - decodedRevision, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) + decodedRevision, tokenStatus, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) if err != nil { return status.Errorf(codes.InvalidArgument, "failed to decode start revision: %s", err) } + if tokenStatus == zedtoken.StatusMismatchedDatastoreID { + return status.Errorf(codes.InvalidArgument, "start revision was generated by a different datastore") + } + afterRevision = decodedRevision } else { var err error @@ -94,6 +98,11 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if ok { filtered := filterUpdates(objectTypes, filters, update.RelationshipChanges) if len(filtered) > 0 { + zedToken, err := zedtoken.NewFromRevision(ctx, update.Revision, ds) + if err != nil { + return err + } + converted, err := tuple.UpdatesToV1RelationshipUpdates(filtered) if err != nil { return status.Errorf(codes.Internal, "failed to convert updates: %s", err) @@ -101,7 +110,7 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if err := stream.Send(&v1.WatchResponse{ Updates: converted, - ChangesThrough: zedtoken.MustNewFromRevision(update.Revision), + ChangesThrough: zedToken, OptionalTransactionMetadata: update.Metadata, }); err != nil { return status.Errorf(codes.Canceled, "watch canceled by user: %s", err) diff --git a/internal/services/v1/watch_test.go b/internal/services/v1/watch_test.go index 7b9ea41651..84ab930ca1 100644 --- a/internal/services/v1/watch_test.go +++ b/internal/services/v1/watch_test.go @@ -210,7 +210,7 @@ func TestWatch(t *testing.T) { t.Cleanup(cleanup) client := v1.NewWatchServiceClient(conn) - cursor := zedtoken.MustNewFromRevision(revision) + cursor := zedtoken.MustNewFromRevisionForTesting(revision) if tc.startCursor != nil { cursor = tc.startCursor } diff --git a/internal/testserver/server.go b/internal/testserver/server.go index 1b8fd6d991..7f629d98b0 100644 --- a/internal/testserver/server.go +++ b/internal/testserver/server.go @@ -105,7 +105,7 @@ func NewTestServerWithConfigAndDatastore(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor("testserver"), + Middleware: consistency.UnaryServerInterceptor("testserver", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -128,7 +128,7 @@ func NewTestServerWithConfigAndDatastore(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor("testserver"), + Middleware: consistency.StreamServerInterceptor("testserver", consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index 97d5a613cb..b19e0af1ad 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -112,6 +112,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { apiFlags.Uint32Var(&config.MaxDeleteRelationshipsLimit, "max-delete-relationships-limit", 1000, "maximum number of relationships that can be deleted in a single request") apiFlags.Uint32Var(&config.MaxLookupResourcesLimit, "max-lookup-resources-limit", 1000, "maximum number of resources that can be looked up in a single request") apiFlags.Uint32Var(&config.MaxBulkExportRelationshipsLimit, "max-bulk-export-relationships-limit", 10_000, "maximum number of relationships that can be exported in a single request") + apiFlags.StringVar(&config.MismatchZedTokenBehavior, "mismatch-zed-token-behavior", "full-consistency", "behavior when a mismatched zedtoken is encountered. One of: full-consistency (treat as a full-consistency call), min-latency (treat as a min-latency call), error (return an error). defaults to full-consistency for safety.") datastoreFlags := nfs.FlagSet(BoldBlue("Datastore")) // Flags for the datastore diff --git a/pkg/cmd/server/defaults.go b/pkg/cmd/server/defaults.go index b1aff63b7d..228e22ac79 100644 --- a/pkg/cmd/server/defaults.go +++ b/pkg/cmd/server/defaults.go @@ -181,14 +181,15 @@ const ( //go:generate go run github.com/ecordell/optgen -output zz_generated.middlewareoption.go . MiddlewareOption type MiddlewareOption struct { - Logger zerolog.Logger `debugmap:"hidden"` - AuthFunc grpcauth.AuthFunc `debugmap:"hidden"` - EnableVersionResponse bool `debugmap:"visible"` - DispatcherForMiddleware dispatch.Dispatcher `debugmap:"hidden"` - EnableRequestLog bool `debugmap:"visible"` - EnableResponseLog bool `debugmap:"visible"` - DisableGRPCHistogram bool `debugmap:"visible"` - MiddlewareServiceLabel string `debugmap:"visible"` + Logger zerolog.Logger `debugmap:"hidden"` + AuthFunc grpcauth.AuthFunc `debugmap:"hidden"` + EnableVersionResponse bool `debugmap:"visible"` + DispatcherForMiddleware dispatch.Dispatcher `debugmap:"hidden"` + EnableRequestLog bool `debugmap:"visible"` + EnableResponseLog bool `debugmap:"visible"` + DisableGRPCHistogram bool `debugmap:"visible"` + MiddlewareServiceLabel string `debugmap:"visible"` + MismatchingZedTokenOption consistencymw.MismatchingTokenOption `debugmap:"visible"` unaryDatastoreMiddleware *ReferenceableMiddleware[grpc.UnaryServerInterceptor] `debugmap:"hidden"` streamDatastoreMiddleware *ReferenceableMiddleware[grpc.StreamServerInterceptor] `debugmap:"hidden"` @@ -221,6 +222,7 @@ func (m MiddlewareOption) WithDatastoreMiddleware(middleware Middleware) Middlew EnableResponseLog: m.EnableResponseLog, DisableGRPCHistogram: m.DisableGRPCHistogram, MiddlewareServiceLabel: m.MiddlewareServiceLabel, + MismatchingZedTokenOption: m.MismatchingZedTokenOption, unaryDatastoreMiddleware: &unary, streamDatastoreMiddleware: &stream, } @@ -248,6 +250,7 @@ func (m MiddlewareOption) WithDatastore(ds datastore.Datastore) MiddlewareOption EnableResponseLog: m.EnableResponseLog, DisableGRPCHistogram: m.DisableGRPCHistogram, MiddlewareServiceLabel: m.MiddlewareServiceLabel, + MismatchingZedTokenOption: m.MismatchingZedTokenOption, unaryDatastoreMiddleware: &unary, streamDatastoreMiddleware: &stream, } @@ -350,7 +353,7 @@ func DefaultUnaryMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.UnaryS NewUnaryMiddleware(). WithName(DefaultInternalMiddlewareConsistency). - WithInterceptor(consistencymw.UnaryServerInterceptor(opts.MiddlewareServiceLabel)). + WithInterceptor(consistencymw.UnaryServerInterceptor(opts.MiddlewareServiceLabel, opts.MismatchingZedTokenOption)). Done(), NewUnaryMiddleware(). @@ -428,7 +431,7 @@ func DefaultStreamingMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.St NewStreamMiddleware(). WithName(DefaultInternalMiddlewareConsistency). - WithInterceptor(consistencymw.StreamServerInterceptor(opts.MiddlewareServiceLabel)). + WithInterceptor(consistencymw.StreamServerInterceptor(opts.MiddlewareServiceLabel, opts.MismatchingZedTokenOption)). Done(), NewStreamMiddleware(). diff --git a/pkg/cmd/server/defaults_test.go b/pkg/cmd/server/defaults_test.go index 5b3ea59f60..c83ec5dee2 100644 --- a/pkg/cmd/server/defaults_test.go +++ b/pkg/cmd/server/defaults_test.go @@ -12,6 +12,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/dispatch" "github.com/authzed/spicedb/internal/middleware/pertoken" + "github.com/authzed/spicedb/pkg/middleware/consistency" ) func TestWithDatastore(t *testing.T) { @@ -30,6 +31,7 @@ func TestWithDatastore(t *testing.T) { true, false, "service", + consistency.TreatMismatchingTokensAsError, nil, nil, } @@ -48,6 +50,7 @@ func TestWithDatastore(t *testing.T) { require.Equal(t, opts.EnableResponseLog, withDS.EnableResponseLog) require.Equal(t, opts.DisableGRPCHistogram, withDS.DisableGRPCHistogram) require.Equal(t, opts.MiddlewareServiceLabel, withDS.MiddlewareServiceLabel) + require.Equal(t, opts.MismatchingZedTokenOption, withDS.MismatchingZedTokenOption) _, authError := withDS.AuthFunc(context.Background()) require.Error(t, authError) @@ -70,6 +73,7 @@ func TestWithDatastoreMiddleware(t *testing.T) { true, false, "anotherservice", + consistency.TreatMismatchingTokensAsError, nil, nil, } @@ -87,6 +91,7 @@ func TestWithDatastoreMiddleware(t *testing.T) { require.Equal(t, opts.EnableResponseLog, withDS.EnableResponseLog) require.Equal(t, opts.DisableGRPCHistogram, withDS.DisableGRPCHistogram) require.Equal(t, opts.MiddlewareServiceLabel, withDS.MiddlewareServiceLabel) + require.Equal(t, opts.MismatchingZedTokenOption, withDS.MismatchingZedTokenOption) _, authError := withDS.AuthFunc(context.Background()) require.Error(t, authError) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 3ded6efb7a..5dfa3a6009 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -43,6 +43,7 @@ import ( datastorecfg "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/middleware/consistency" "github.com/authzed/spicedb/pkg/middleware/requestid" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -121,6 +122,7 @@ type Config struct { MaxBulkExportRelationshipsLimit uint32 `debugmap:"visible"` EnableExperimentalLookupResources bool `debugmap:"visible"` EnableExperimentalRelationshipExpiration bool `debugmap:"visible"` + MismatchZedTokenBehavior string `debugmap:"visible"` // Additional Services MetricsAPI util.HTTPServerConfig `debugmap:"visible"` @@ -385,6 +387,24 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { serverName = "spicedb" } + var mismatchZedTokenOption consistency.MismatchingTokenOption + switch c.MismatchZedTokenBehavior { + case "": + fallthrough + + case "full-consistency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsFullConsistency + + case "min-latency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsMinLatency + + case "error": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsError + + default: + return nil, fmt.Errorf("unknown mismatched zedtoken behavior: %s", c.MismatchZedTokenBehavior) + } + opts := MiddlewareOption{ log.Logger, c.GRPCAuthFunc, @@ -394,6 +414,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { c.EnableResponseLogs, c.DisableGRPCLatencyHistogram, serverName, + mismatchZedTokenOption, nil, nil, } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index b36e405da5..c5b6a5fb92 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -11,6 +11,7 @@ import ( "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" + "github.com/authzed/spicedb/pkg/middleware/consistency" "github.com/authzed/spicedb/pkg/testutil" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" @@ -231,7 +232,7 @@ func TestModifyUnaryMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", nil, nil} + opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, nil, nil} opt = opt.WithDatastore(nil) defaultMw, err := DefaultUnaryMiddleware(opt) @@ -259,7 +260,7 @@ func TestModifyStreamingMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", nil, nil} + opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, nil, nil} opt = opt.WithDatastore(nil) defaultMw, err := DefaultStreamingMiddleware(opt) diff --git a/pkg/cmd/server/zz_generated.middlewareoption.go b/pkg/cmd/server/zz_generated.middlewareoption.go index 79ff8c9f56..ed1dd3430c 100644 --- a/pkg/cmd/server/zz_generated.middlewareoption.go +++ b/pkg/cmd/server/zz_generated.middlewareoption.go @@ -3,6 +3,7 @@ package server import ( dispatch "github.com/authzed/spicedb/internal/dispatch" + consistency "github.com/authzed/spicedb/pkg/middleware/consistency" defaults "github.com/creasty/defaults" helpers "github.com/ecordell/optgen/helpers" auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" @@ -41,6 +42,7 @@ func (m *MiddlewareOption) ToOption() MiddlewareOptionOption { to.EnableResponseLog = m.EnableResponseLog to.DisableGRPCHistogram = m.DisableGRPCHistogram to.MiddlewareServiceLabel = m.MiddlewareServiceLabel + to.MismatchingZedTokenOption = m.MismatchingZedTokenOption to.unaryDatastoreMiddleware = m.unaryDatastoreMiddleware to.streamDatastoreMiddleware = m.streamDatastoreMiddleware } @@ -54,6 +56,7 @@ func (m MiddlewareOption) DebugMap() map[string]any { debugMap["EnableResponseLog"] = helpers.DebugValue(m.EnableResponseLog, false) debugMap["DisableGRPCHistogram"] = helpers.DebugValue(m.DisableGRPCHistogram, false) debugMap["MiddlewareServiceLabel"] = helpers.DebugValue(m.MiddlewareServiceLabel, false) + debugMap["MismatchingZedTokenOption"] = helpers.DebugValue(m.MismatchingZedTokenOption, false) return debugMap } @@ -128,3 +131,10 @@ func WithMiddlewareServiceLabel(middlewareServiceLabel string) MiddlewareOptionO m.MiddlewareServiceLabel = middlewareServiceLabel } } + +// WithMismatchingZedTokenOption returns an option that can set MismatchingZedTokenOption on a MiddlewareOption +func WithMismatchingZedTokenOption(mismatchingZedTokenOption consistency.MismatchingTokenOption) MiddlewareOptionOption { + return func(m *MiddlewareOption) { + m.MismatchingZedTokenOption = mismatchingZedTokenOption + } +} diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index 8e2dfaddef..432a3dd1b7 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -89,6 +89,7 @@ func (c *Config) ToOption() ConfigOption { to.MaxBulkExportRelationshipsLimit = c.MaxBulkExportRelationshipsLimit to.EnableExperimentalLookupResources = c.EnableExperimentalLookupResources to.EnableExperimentalRelationshipExpiration = c.EnableExperimentalRelationshipExpiration + to.MismatchZedTokenBehavior = c.MismatchZedTokenBehavior to.MetricsAPI = c.MetricsAPI to.UnaryMiddlewareModification = c.UnaryMiddlewareModification to.StreamingMiddlewareModification = c.StreamingMiddlewareModification @@ -158,6 +159,7 @@ func (c Config) DebugMap() map[string]any { debugMap["MaxBulkExportRelationshipsLimit"] = helpers.DebugValue(c.MaxBulkExportRelationshipsLimit, false) debugMap["EnableExperimentalLookupResources"] = helpers.DebugValue(c.EnableExperimentalLookupResources, false) debugMap["EnableExperimentalRelationshipExpiration"] = helpers.DebugValue(c.EnableExperimentalRelationshipExpiration, false) + debugMap["MismatchZedTokenBehavior"] = helpers.DebugValue(c.MismatchZedTokenBehavior, false) debugMap["MetricsAPI"] = helpers.DebugValue(c.MetricsAPI, false) debugMap["SilentlyDisableTelemetry"] = helpers.DebugValue(c.SilentlyDisableTelemetry, false) debugMap["TelemetryCAOverridePath"] = helpers.DebugValue(c.TelemetryCAOverridePath, false) @@ -570,6 +572,13 @@ func WithEnableExperimentalRelationshipExpiration(enableExperimentalRelationship } } +// WithMismatchZedTokenBehavior returns an option that can set MismatchZedTokenBehavior on a Config +func WithMismatchZedTokenBehavior(mismatchZedTokenBehavior string) ConfigOption { + return func(c *Config) { + c.MismatchZedTokenBehavior = mismatchZedTokenBehavior + } +} + // WithMetricsAPI returns an option that can set MetricsAPI on a Config func WithMetricsAPI(metricsAPI util.HTTPServerConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go index 4233efaa5c..67c3f1ef5c 100644 --- a/pkg/cursor/cursor.go +++ b/pkg/cursor/cursor.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "encoding/base64" "errors" "fmt" @@ -11,6 +12,7 @@ import ( dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" impl "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/zedtoken" ) // Encode converts a decoded cursor to its opaque version. @@ -109,25 +111,39 @@ func DecodeToDispatchCursor(encoded *v1.Cursor, callAndParameterHash string) (*d // DecodeToDispatchRevision decodes an encoded API cursor into an internal dispatch revision. // NOTE: this method does *not* verify the caller's method signature. -func DecodeToDispatchRevision(encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, error) { +func DecodeToDispatchRevision(ctx context.Context, encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, zedtoken.TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return nil, err + return nil, zedtoken.StatusUnknown, err } v1decoded := decoded.GetV1() if v1decoded == nil { - return nil, ErrNilCursor + return nil, zedtoken.StatusUnknown, ErrNilCursor + } + + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, zedtoken.StatusUnknown, fmt.Errorf(errEncodeError, err) } parsed, err := ds.RevisionFromString(v1decoded.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, zedtoken.StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if v1decoded.DatastoreUniqueId == "" { + return parsed, zedtoken.StatusLegacyEmptyDatastoreID, nil + } + + if v1decoded.DatastoreUniqueId != datastoreUniqueID { + return parsed, zedtoken.StatusMismatchedDatastoreID, nil } - return parsed, nil + return parsed, zedtoken.StatusValid, nil } type revisionDecoder interface { + UniqueID(_ context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/cursor/cursor_test.go b/pkg/cursor/cursor_test.go index 488c3d56bf..115756f303 100644 --- a/pkg/cursor/cursor_test.go +++ b/pkg/cursor/cursor_test.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "fmt" "testing" @@ -59,7 +60,7 @@ func TestEncodeDecode(t *testing.T) { require.Equal(tc.sections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(encoded, revisions.CommonDecoder{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -137,7 +138,7 @@ func TestDecode(t *testing.T) { require.NotNil(decoded) require.Equal(testCase.expectedSections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(&v1.Cursor{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), &v1.Cursor{ Token: testCase.token, }, revisions.CommonDecoder{ Kind: revisions.TransactionID, diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index 59738b0551..6a846a1518 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -158,11 +158,11 @@ func (dc *DevContext) RunV1InMemoryService() (*grpc.ClientConn, func(), error) { s := grpc.NewServer( grpc.ChainUnaryInterceptor( datastoremw.UnaryServerInterceptor(dc.Datastore), - consistency.UnaryServerInterceptor("development"), + consistency.UnaryServerInterceptor("development", consistency.TreatMismatchingTokensAsError), ), grpc.ChainStreamInterceptor( datastoremw.StreamServerInterceptor(dc.Datastore), - consistency.StreamServerInterceptor("development"), + consistency.StreamServerInterceptor("development", consistency.TreatMismatchingTokensAsError), ), ) ps := v1svc.NewPermissionsServer(dc.Dispatcher, v1svc.PermissionsServerConfig{ diff --git a/pkg/middleware/consistency/consistency.go b/pkg/middleware/consistency/consistency.go index 8f75a19252..d1da65131a 100644 --- a/pkg/middleware/consistency/consistency.go +++ b/pkg/middleware/consistency/consistency.go @@ -18,6 +18,7 @@ import ( "github.com/authzed/spicedb/internal/services/shared" "github.com/authzed/spicedb/pkg/cursor" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/zedtoken" ) @@ -28,6 +29,27 @@ var ConsistencyCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "Count of the consistencies used per request", }, []string{"method", "source", "service"}) +// MismatchingTokenOption is the option specifying the behavior of the consistency middleware +// when a ZedToken provided references a different datastore instance than the current +// datastore instance. +type MismatchingTokenOption int + +const ( + // TreatMismatchingTokensAsFullConsistency specifies that the middleware should treat + // a ZedToken that references a different datastore instance as a request for full + // consistency. + TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota + + // TreatMismatchingTokensAsMinLatency specifies that the middleware should treat + // a ZedToken that references a different datastore instance as a request for min + // latency. + TreatMismatchingTokensAsMinLatency + + // TreatMismatchingTokensAsError specifies that the middleware should raise an error + // when a ZedToken that references a different datastore instance is provided. + TreatMismatchingTokensAsError +) + type hasConsistency interface{ GetConsistency() *v1.Consistency } type hasOptionalCursor interface{ GetOptionalCursor() *v1.Cursor } @@ -55,7 +77,17 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, handle := c.(*revisionHandle) rev := handle.revision if rev != nil { - return rev, zedtoken.MustNewFromRevision(rev), nil + ds := datastoremw.FromContext(ctx) + if ds == nil { + return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore") + } + + zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds) + if err != nil { + return nil, nil, err + } + + return rev, zedToken, nil } } @@ -64,10 +96,10 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, // AddRevisionToContext adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string) error { +func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string, option MismatchingTokenOption) error { switch req := req.(type) { case hasConsistency: - return addRevisionToContextFromConsistency(ctx, req, ds, serviceLabel) + return addRevisionToContextFromConsistency(ctx, req, ds, serviceLabel, option) default: return nil } @@ -75,7 +107,7 @@ func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Dat // addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, serviceLabel string) error { +func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, serviceLabel string, option MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil @@ -93,7 +125,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency ConsistencyCounter.WithLabelValues("snapshot", "cursor", serviceLabel).Inc() } - requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds) + requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -137,7 +169,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency case consistency.GetAtLeastAsFresh() != nil: // At least as fresh as: Pick one of the datastore's revision and that specified, which // ever is later. - picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds) + picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -159,11 +191,15 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency ConsistencyCounter.WithLabelValues("snapshot", "request", serviceLabel).Inc() } - requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) + requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) if err != nil { return errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + return fmt.Errorf("ZedToken specified references a different datastore instance but at-exact-snapshot was requested") + } + err = ds.CheckRevision(ctx, requestedRev) if err != nil { return rewriteDatastoreError(ctx, err) @@ -187,7 +223,7 @@ var bypassServiceWhitelist = map[string]struct{}{ // UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func UnaryServerInterceptor(serviceLabel string) grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(serviceLabel string, option MismatchingTokenOption) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { @@ -196,7 +232,7 @@ func UnaryServerInterceptor(serviceLabel string) grpc.UnaryServerInterceptor { } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := AddRevisionToContext(newCtx, req, ds, serviceLabel); err != nil { + if err := AddRevisionToContext(newCtx, req, ds, serviceLabel, option); err != nil { return nil, err } @@ -206,14 +242,14 @@ func UnaryServerInterceptor(serviceLabel string) grpc.UnaryServerInterceptor { // StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func StreamServerInterceptor(serviceLabel string) grpc.StreamServerInterceptor { +func StreamServerInterceptor(serviceLabel string, option MismatchingTokenOption) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, AddRevisionToContext} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, option, AddRevisionToContext} return handler(srv, wrapper) } } @@ -222,7 +258,8 @@ type recvWrapper struct { grpc.ServerStream ctx context.Context serviceLabel string - handler func(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string) error + option MismatchingTokenOption + handler func(context.Context, interface{}, datastore.Datastore, string, MismatchingTokenOption) error } func (s *recvWrapper) Context() context.Context { return s.ctx } @@ -232,12 +269,12 @@ func (s *recvWrapper) RecvMsg(m interface{}) error { return err } ds := datastoremw.MustFromContext(s.ctx) - return s.handler(s.ctx, m, ds, s.serviceLabel) + return s.handler(s.ctx, m, ds, s.serviceLabel, s.option) } // pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most // recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise. -func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) { +func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) { // Calculate a revision as we see fit databaseRev, err := ds.OptimizedRevision(ctx) if err != nil { @@ -245,11 +282,35 @@ func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore. } if requested != nil { - requestedRev, err := zedtoken.DecodeRevision(requested, ds) + requestedRev, status, err := zedtoken.DecodeRevision(requested, ds) if err != nil { return datastore.NoRevision, false, errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + switch option { + case TreatMismatchingTokensAsFullConsistency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references a different datastore instance and SpiceDB is configured to treat this as a full consistency request") + headRev, err := ds.HeadRevision(ctx) + if err != nil { + return datastore.NoRevision, false, err + } + + return headRev, false, nil + + case TreatMismatchingTokensAsMinLatency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references a different datastore instance and SpiceDB is configured to treat this as a min latency request") + return databaseRev, false, nil + + case TreatMismatchingTokensAsError: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references a different datastore instance and SpiceDB is configured to raise an error in this scenario") + return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references a different datastore instance and SpiceDB is configured to raise an error in this scenario") + + default: + return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option) + } + } + if databaseRev.GreaterThan(requestedRev) { return databaseRev, false, nil } diff --git a/pkg/middleware/consistency/consistency_test.go b/pkg/middleware/consistency/consistency_test.go index a5f0b4f516..674f0a38ed 100644 --- a/pkg/middleware/consistency/consistency_test.go +++ b/pkg/middleware/consistency/consistency_test.go @@ -10,6 +10,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" "github.com/authzed/spicedb/internal/datastore/revisions" + datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/pkg/cursor" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/zedtoken" @@ -29,7 +30,9 @@ func TestAddRevisionToContextNoneSupplied(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) - err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, "somelabel") + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -46,13 +49,15 @@ func TestAddRevisionToContextMinimizeLatency(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{ MinimizeLatency: true, }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -69,13 +74,15 @@ func TestAddRevisionToContextFullyConsistent(t *testing.T) { ds.On("HeadRevision").Return(head, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_FullyConsistent{ FullyConsistent: true, }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -93,13 +100,15 @@ func TestAddRevisionToContextAtLeastAsFresh(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(exact), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -117,13 +126,15 @@ func TestAddRevisionToContextAtValidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -141,13 +152,15 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", zero.String()).Return(zero, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(zero), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(zero), }, }, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.Error(err) ds.AssertExpectations(t) } @@ -155,7 +168,10 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { func TestAddRevisionToContextNoConsistencyAPI(t *testing.T) { require := require.New(t) + ds := &proxy_test.MockDatastore{} + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) _, _, err := RevisionFromContext(updated) require.Error(err) @@ -174,14 +190,16 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { // revision in context is at `exact` updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, OptionalCursor: cursor, - }, ds, "somelabel") + }, ds, "somelabel", TreatMismatchingTokensAsError) require.NoError(err) // ensure we get back `optimized` from the cursor @@ -191,3 +209,153 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { require.True(optimized.Equal(rev)) ds.AssertExpectations(t) } + +func TestAtExactSnapshotWithMismatchedToken(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore but at-exact-snapshot") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectError(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectMinLatency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsMinLatency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(optimized.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectFullConsistency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("HeadRevision").Return(head, nil).Once() + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsFullConsistency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(head.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAddRevisionToContextAtLeastAsFreshMatchingIDs(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() + + ds.CurrentUniqueID = "foo" + + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), + }, + }, + }, ds, "somelabel", TreatMismatchingTokensAsError) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(exact.Equal(rev)) + ds.AssertExpectations(t) +} diff --git a/pkg/middleware/consistency/forcefull.go b/pkg/middleware/consistency/forcefull.go index 0ec88f7a4c..1d196f0a60 100644 --- a/pkg/middleware/consistency/forcefull.go +++ b/pkg/middleware/consistency/forcefull.go @@ -21,7 +21,7 @@ func ForceFullConsistencyUnaryServerInterceptor(serviceLabel string) grpc.UnaryS } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := setFullConsistencyRevisionToContext(newCtx, req, ds, serviceLabel); err != nil { + if err := setFullConsistencyRevisionToContext(newCtx, req, ds, serviceLabel, TreatMismatchingTokensAsFullConsistency); err != nil { return nil, err } @@ -38,12 +38,12 @@ func ForceFullConsistencyStreamServerInterceptor(serviceLabel string) grpc.Strea return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, setFullConsistencyRevisionToContext} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), serviceLabel, TreatMismatchingTokensAsFullConsistency, setFullConsistencyRevisionToContext} return handler(srv, wrapper) } } -func setFullConsistencyRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string) error { +func setFullConsistencyRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, serviceLabel string, _ MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil diff --git a/pkg/proto/impl/v1/impl.pb.go b/pkg/proto/impl/v1/impl.pb.go index 67ee033f1d..6c7a448b5d 100644 --- a/pkg/proto/impl/v1/impl.pb.go +++ b/pkg/proto/impl/v1/impl.pb.go @@ -402,6 +402,9 @@ type V1Cursor struct { DispatchVersion uint32 `protobuf:"varint,4,opt,name=dispatch_version,json=dispatchVersion,proto3" json:"dispatch_version,omitempty"` // flags are flags set by the API caller. Flags map[string]string `protobuf:"bytes,5,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + DatastoreUniqueId string `protobuf:"bytes,6,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *V1Cursor) Reset() { @@ -471,6 +474,13 @@ func (x *V1Cursor) GetFlags() map[string]string { return nil } +func (x *V1Cursor) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + type DocComment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -814,6 +824,9 @@ type DecodedZedToken_V1ZedToken struct { unknownFields protoimpl.UnknownFields Revision string `protobuf:"bytes,1,opt,name=revision,proto3" json:"revision,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + DatastoreUniqueId string `protobuf:"bytes,2,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *DecodedZedToken_V1ZedToken) Reset() { @@ -855,6 +868,13 @@ func (x *DecodedZedToken_V1ZedToken) GetRevision() string { return "" } +func (x *DecodedZedToken_V1ZedToken) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + var File_impl_v1_impl_proto protoreflect.FileDescriptor var file_impl_v1_impl_proto_rawDesc = []byte{ @@ -884,7 +904,7 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x08, 0x56, 0x32, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x82, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, + 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0xb2, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x55, 0x0a, 0x14, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x76, 0x31, 0x5f, 0x7a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, @@ -897,15 +917,18 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x65, 0x6e, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x1a, 0x26, 0x0a, 0x08, 0x56, 0x31, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, - 0x1a, 0x28, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, + 0x1a, 0x58, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, + 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x45, 0x0a, 0x0d, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x23, 0x0a, 0x02, 0x76, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, - 0x6f, 0x66, 0x22, 0x94, 0x02, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, + 0x6f, 0x66, 0x22, 0xc4, 0x02, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x73, @@ -918,7 +941,10 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x2e, 0x46, 0x6c, - 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x1a, + 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x12, + 0x2e, 0x0a, 0x13, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, + 0x71, 0x75, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x1a, 0x38, 0x0a, 0x0a, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, diff --git a/pkg/proto/impl/v1/impl.pb.validate.go b/pkg/proto/impl/v1/impl.pb.validate.go index dea8479bb2..a52eece1a5 100644 --- a/pkg/proto/impl/v1/impl.pb.validate.go +++ b/pkg/proto/impl/v1/impl.pb.validate.go @@ -735,6 +735,8 @@ func (m *V1Cursor) validate(all bool) error { // no validation rules for Flags + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return V1CursorMultiError(errors) } @@ -1591,6 +1593,8 @@ func (m *DecodedZedToken_V1ZedToken) validate(all bool) error { // no validation rules for Revision + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return DecodedZedToken_V1ZedTokenMultiError(errors) } diff --git a/pkg/proto/impl/v1/impl_vtproto.pb.go b/pkg/proto/impl/v1/impl_vtproto.pb.go index 95e368513f..074fd1534c 100644 --- a/pkg/proto/impl/v1/impl_vtproto.pb.go +++ b/pkg/proto/impl/v1/impl_vtproto.pb.go @@ -154,6 +154,7 @@ func (m *DecodedZedToken_V1ZedToken) CloneVT() *DecodedZedToken_V1ZedToken { } r := new(DecodedZedToken_V1ZedToken) r.Revision = m.Revision + r.DatastoreUniqueId = m.DatastoreUniqueId if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -242,6 +243,7 @@ func (m *V1Cursor) CloneVT() *V1Cursor { r.Revision = m.Revision r.CallAndParametersHash = m.CallAndParametersHash r.DispatchVersion = m.DispatchVersion + r.DatastoreUniqueId = m.DatastoreUniqueId if rhs := m.Sections; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -549,6 +551,9 @@ func (this *DecodedZedToken_V1ZedToken) EqualVT(that *DecodedZedToken_V1ZedToken if this.Revision != that.Revision { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -726,6 +731,9 @@ func (this *V1Cursor) EqualVT(that *V1Cursor) bool { return false } } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1152,6 +1160,13 @@ func (m *DecodedZedToken_V1ZedToken) MarshalToSizedBufferVT(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x12 + } if len(m.Revision) > 0 { i -= len(m.Revision) copy(dAtA[i:], m.Revision) @@ -1345,6 +1360,13 @@ func (m *V1Cursor) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x32 + } if len(m.Flags) > 0 { for k := range m.Flags { v := m.Flags[k] @@ -1696,6 +1718,10 @@ func (m *DecodedZedToken_V1ZedToken) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1799,6 +1825,10 @@ func (m *V1Cursor) SizeVT() (n int) { n += mapEntrySize + 1 + protohelpers.SizeOfVarint(uint64(mapEntrySize)) } } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -2440,6 +2470,38 @@ func (m *DecodedZedToken_V1ZedToken) UnmarshalVT(dAtA []byte) error { } m.Revision = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2958,6 +3020,38 @@ func (m *V1Cursor) UnmarshalVT(dAtA []byte) error { } m.Flags[mapkey] = mapvalue iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/zedtoken/zedtoken.go b/pkg/zedtoken/zedtoken.go index 797b3e184b..f655b74565 100644 --- a/pkg/zedtoken/zedtoken.go +++ b/pkg/zedtoken/zedtoken.go @@ -2,6 +2,7 @@ package zedtoken import ( + "context" "encoding/base64" "errors" "fmt" @@ -22,9 +23,32 @@ const ( // zedtoken argument to Decode var ErrNilZedToken = errors.New("zedtoken pointer was nil") -// MustNewFromRevision generates an encoded zedtoken from an integral revision. -func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { - encoded, err := NewFromRevision(revision) +// legacyEmptyDatastoreID is the empty datastore ID for legacy tokens and cursors. +const legacyEmptyDatastoreID = "" + +// TokenStatus is the status of a zedtoken. +type TokenStatus int + +const ( + // StatusUnknown indicates that the status of the zedtoken is unknown. + StatusUnknown TokenStatus = iota + + // StatusLegacyEmptyDatastoreID indicates that the zedtoken is a legacy token + // with an empty datastore ID. + StatusLegacyEmptyDatastoreID + + // StatusValid indicates that the zedtoken is valid. + StatusValid + + // StatusMismatchedDatastoreID indicates that the zedtoken is valid, but the + // datastore ID does not match the current datastore, indicating that the + // token was generated by a different datastore. + StatusMismatchedDatastoreID +) + +// MustNewFromRevisionForTesting generates an encoded zedtoken from an integral revision. +func MustNewFromRevisionForTesting(revision datastore.Revision) *v1.ZedToken { + encoded, err := newFromRevision(revision, legacyEmptyDatastoreID) if err != nil { panic(err) } @@ -32,11 +56,21 @@ func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { } // NewFromRevision generates an encoded zedtoken from an integral revision. -func NewFromRevision(revision datastore.Revision) (*v1.ZedToken, error) { +func NewFromRevision(ctx context.Context, revision datastore.Revision, ds datastore.Datastore) (*v1.ZedToken, error) { + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, fmt.Errorf(errEncodeError, err) + } + + return newFromRevision(revision, datastoreUniqueID) +} + +func newFromRevision(revision datastore.Revision, datastoreUniqueID string) (*v1.ZedToken, error) { toEncode := &zedtoken.DecodedZedToken{ VersionOneof: &zedtoken.DecodedZedToken_V1{ V1: &zedtoken.DecodedZedToken_V1ZedToken{ - Revision: revision.String(), + Revision: revision.String(), + DatastoreUniqueId: datastoreUniqueID, }, }, } @@ -77,10 +111,10 @@ func Decode(encoded *v1.ZedToken) (*zedtoken.DecodedZedToken, error) { } // DecodeRevision converts and extracts the revision from a zedtoken or legacy zookie. -func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, error) { +func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return datastore.NoRevision, err + return datastore.NoRevision, StatusUnknown, err } switch ver := decoded.VersionOneof.(type) { @@ -88,21 +122,36 @@ func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revisio revString := fmt.Sprintf("%d", ver.DeprecatedV1Zookie.Revision) parsed, err := ds.RevisionFromString(revString) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + return parsed, StatusLegacyEmptyDatastoreID, nil case *zedtoken.DecodedZedToken_V1: parsed, err := ds.RevisionFromString(ver.V1.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + + if ver.V1.DatastoreUniqueId == legacyEmptyDatastoreID { + return parsed, StatusLegacyEmptyDatastoreID, nil + } + + datastoreUniqueID, err := ds.UniqueID(context.Background()) + if err != nil { + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if ver.V1.DatastoreUniqueId != datastoreUniqueID { + return parsed, StatusMismatchedDatastoreID, nil + } + + return parsed, StatusValid, nil default: - return datastore.NoRevision, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) } } type revisionDecoder interface { + UniqueID(context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/zedtoken/zedtoken_test.go b/pkg/zedtoken/zedtoken_test.go index 09bbc4d48e..85df77aa4d 100644 --- a/pkg/zedtoken/zedtoken_test.go +++ b/pkg/zedtoken/zedtoken_test.go @@ -41,10 +41,9 @@ func TestZedTokenEncode(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -58,10 +57,9 @@ func TestZedTokenEncodeHLC(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.HybridLogicalClock, }) require.NoError(err) @@ -71,65 +69,92 @@ func TestZedTokenEncodeHLC(t *testing.T) { } var decodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { format: "invalid", token: "abc", expectedRevision: datastore.NoRevision, + expectedStatus: StatusUnknown, expectError: true, }, { format: "V1 Zookie", token: "CAESAA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggB", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggC", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAwiAAg==", expectedRevision: revisions.NewForTransactionID(256), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAIaAwoBMA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMQ==", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMg==", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBNA==", expectedRevision: revisions.NewForTransactionID(4), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, + { + format: "V1 ZedToken with matching datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "someuniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusValid, + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "anotheruniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusMismatchedDatastoreID, + expectError: false, + }, } func TestDecode(t *testing.T) { @@ -139,15 +164,17 @@ func TestDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.TransactionID, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.TransactionID, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", @@ -160,14 +187,17 @@ func TestDecode(t *testing.T) { } var hlcDecodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { - format: "V1 ZedToken", - token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + format: "V1 ZedToken", + token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + expectedStatus: StatusLegacyEmptyDatastoreID, expectedRevision: func() datastore.Revision { r, err := revisions.NewForHLC(decimal.NewFromInt(1621538189028928000)) if err != nil { @@ -175,11 +205,47 @@ var hlcDecodeTests = []struct { } return r }(), + }, + { + format: "V1 ZedToken", + token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + expectedStatus: StatusLegacyEmptyDatastoreID, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), expectError: false, }, { - format: "V1 ZedToken", - token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + format: "V1 ZedToken with matching datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusValid, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "arrrg-6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusMismatchedDatastoreID, expectedRevision: (func() datastore.Revision { v, err := decimal.NewFromString("1693540940373045727.0000000001") if err != nil { @@ -204,15 +270,17 @@ func TestHLCDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.HybridLogicalClock, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.HybridLogicalClock, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", diff --git a/proto/internal/impl/v1/impl.proto b/proto/internal/impl/v1/impl.proto index 836d833910..446f1bc84b 100644 --- a/proto/internal/impl/v1/impl.proto +++ b/proto/internal/impl/v1/impl.proto @@ -33,6 +33,10 @@ message DecodedZedToken { } message V1ZedToken { string revision = 1; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + string datastore_unique_id = 2; } oneof version_oneof { V1Zookie deprecated_v1_zookie = 2; @@ -63,6 +67,10 @@ message V1Cursor { // flags are flags set by the API caller. map flags = 5; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + string datastore_unique_id = 6; } message DocComment {