Skip to content

Commit

Permalink
CBG-4452 avoid checking revpos on attachments
Browse files Browse the repository at this point in the history
- this check did not do anything meaningful.
- remove TestMinRevPosProveAttachment which had a race condition
- switch Blip Tester to only send 20 revs in history
  • Loading branch information
torcolvin committed Feb 13, 2025
1 parent 962b132 commit 8a9d534
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 98 deletions.
5 changes: 1 addition & 4 deletions db/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type AttachmentCallback func(name string, digest string, knownData []byte, meta
// The callback is told whether the attachment body is known to the database, according
// to its digest. If the attachment isn't known, the callback can return data for it, which will
// be added to the metadata as a "data" property.
func (c *DatabaseCollection) ForEachStubAttachment(body Body, minRevpos int, docID string, existingDigests map[string]string, callback AttachmentCallback) error {
func (c *DatabaseCollection) ForEachStubAttachment(body Body, docID string, existingDigests map[string]string, callback AttachmentCallback) error {
atts := GetBodyAttachments(body)
if atts == nil && body[BodyAttachments] != nil {
return base.HTTPErrorf(http.StatusBadRequest, "Invalid _attachments")
Expand All @@ -293,9 +293,6 @@ func (c *DatabaseCollection) ForEachStubAttachment(body Body, minRevpos int, doc
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
}
if meta["data"] == nil {
if revpos, ok := base.ToInt64(meta["revpos"]); revpos < int64(minRevpos) || !ok {
continue
}
digest, ok := meta["digest"].(string)
if !ok {
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
Expand Down
26 changes: 9 additions & 17 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,60 +408,52 @@ func TestForEachStubAttachmentErrors(t *testing.T) {
docID := "foo"
existingDigests := make(map[string]string)
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err := collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err := collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw 400 Invalid _attachments")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))

// Call ForEachStubAttachment with invalid attachment; simulates the error scenario.
doc = `{"_attachments": {"image1.jpeg": "", "image2.jpeg": ""}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw 400 Invalid _attachments")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))

// Check whether the attachment iteration is getting skipped if revpos < minRevpos
callbackCount = 0
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":1}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")
assert.Equal(t, 0, callbackCount)

// Verify the attachment is getting skipped if digest is in existing set
callbackCount = 0
existingDigests["image.jpg"] = "e1a1"
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":2, "digest":"e1a1"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")
assert.Equal(t, 0, callbackCount)

// Verify the attachment is not getting skipped if digest doesn't match existing set
callbackCount = 0
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":2, "digest":"e1a2"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")
assert.Equal(t, 1, callbackCount)

// Check whether the attachment iteration is getting skipped if there is no revpos.
doc = `{"_attachments": {"image.jpg": {"stub":true}}}`
doc = `{"_attachments": {"image.jpg": {"stub":true,"digest":"e1a1"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 2, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")

// Should throw invalid attachment error is the digest is not valid string or empty.
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":1, "digest":true}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw 400 Invalid attachments")
assert.Contains(t, err.Error(), strconv.Itoa(http.StatusBadRequest))

// Call ForEachStubAttachment with some bad digest value. Internally it should throw a missing
// document error and invoke the callback function.
doc = `{"_attachments": {"image.jpg": {"stub":true, "revpos":1, "digest":"9304cdd066efa64f78387e9cc9240a70527271bc"}}}`
assert.NoError(t, base.JSONUnmarshal([]byte(doc), &body))
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.NoError(t, err, "It should not throw any error")

// Simulate an error from the callback function; it should return the same error from ForEachStubAttachment.
Expand All @@ -470,7 +462,7 @@ func TestForEachStubAttachmentErrors(t *testing.T) {
callback = func(name string, digest string, knownData []byte, meta map[string]interface{}) ([]byte, error) {
return nil, errors.New("Can't work with this digest value!")
}
err = collection.ForEachStubAttachment(body, 1, docID, existingDigests, callback)
err = collection.ForEachStubAttachment(body, docID, existingDigests, callback)
assert.Error(t, err, "It should throw the actual error")
assert.Contains(t, err.Error(), "Can't work with this digest value!")
}
Expand Down
27 changes: 9 additions & 18 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,22 +1128,18 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
var currentBucketDoc *Document

// Look at attachments with revpos > the last common ancestor's
minRevpos := 1
if len(history) > 0 {
currentDoc, rawDoc, err := bh.collection.GetDocumentWithRaw(bh.loggingCtx, docID, DocUnmarshalSync)
// If we're able to obtain current doc data then we should use the common ancestor generation++ for min revpos
// as we will already have any attachments on the common ancestor so don't need to ask for them.
// Otherwise we'll have to go as far back as we can in the doc history and choose the last entry in there.
if err == nil {
commonAncestor := currentDoc.History.findAncestorFromSet(currentDoc.CurrentRev, history)
minRevpos, _ = ParseRevID(bh.loggingCtx, commonAncestor)
minRevpos++
rawBucketDoc = rawDoc
currentBucketDoc = currentDoc
} else {
minRevpos, _ = ParseRevID(bh.loggingCtx, history[len(history)-1])
}
}
// updatedRevPos is the revpos of the new revision, to be added to attachment metadata if needed for CBL<4.0 compatibility. revpos is not used
updatedRevPos, _ := ParseRevID(bh.loggingCtx, revID)

// currentDigests is a map from attachment name to the current bucket doc digest,
// for any attachments on the incoming document that are also on the current bucket doc
Expand All @@ -1159,7 +1155,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if !ok {
// If we don't have this attachment already, ensure incoming revpos is greater than minRevPos, otherwise
// update to ensure it's fetched and uploaded
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID)
bodyAtts[name].(map[string]interface{})["revpos"] = updatedRevPos
continue
}

Expand Down Expand Up @@ -1190,23 +1186,18 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
}

incomingAttachmentRevpos, ok := base.ToInt64(incomingAttachmentMeta["revpos"])
if !ok {
return base.HTTPErrorf(http.StatusBadRequest, "Invalid attachment")
}

// Compare the revpos and attachment digest. If incoming revpos is less than or equal to minRevPos and
// digest is different we need to override the revpos and set it to the current revision to ensure
// the attachment is requested and stored
if int(incomingAttachmentRevpos) <= minRevpos && currentAttachmentDigest != incomingAttachmentDigest {
bodyAtts[name].(map[string]interface{})["revpos"], _ = ParseRevID(bh.loggingCtx, revID)
// the attachment is requested and stored. revpos provided for SG/CBL<4.0 compatibility but is not used.
if currentAttachmentDigest != incomingAttachmentDigest {
bodyAtts[name].(map[string]interface{})["revpos"] = updatedRevPos
}
}

body[BodyAttachments] = bodyAtts
}

if err := bh.downloadOrVerifyAttachments(rq.Sender, body, minRevpos, docID, currentDigests); err != nil {
if err := bh.downloadOrVerifyAttachments(rq.Sender, body, docID, currentDigests); err != nil {
base.ErrorfCtx(bh.loggingCtx, "Error during downloadOrVerifyAttachments for doc %s/%s: %v", base.UD(docID), revID, err)
return err
}
Expand Down Expand Up @@ -1472,8 +1463,8 @@ func (bh *blipHandler) sendProveAttachment(sender *blip.Sender, docID, name, dig

// For each attachment in the revision, makes sure it's in the database, asking the client to
// upload it if necessary. This method blocks until all the attachments have been processed.
func (bh *blipHandler) downloadOrVerifyAttachments(sender *blip.Sender, body Body, minRevpos int, docID string, currentDigests map[string]string) error {
return bh.collection.ForEachStubAttachment(body, minRevpos, docID, currentDigests,
func (bh *blipHandler) downloadOrVerifyAttachments(sender *blip.Sender, body Body, docID string, currentDigests map[string]string) error {
return bh.collection.ForEachStubAttachment(body, docID, currentDigests,
func(name string, digest string, knownData []byte, meta map[string]interface{}) ([]byte, error) {
// Request attachment if we don't have it
if knownData == nil {
Expand Down
59 changes: 0 additions & 59 deletions rest/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2363,65 +2363,6 @@ func TestPushUnknownAttachmentAsStub(t *testing.T) {
})
}

func TestMinRevPosWorkToAvoidUnnecessaryProveAttachment(t *testing.T) {
rtConfig := &RestTesterConfig{
GuestEnabled: true,
DatabaseConfig: &DatabaseConfig{
DbConfig: DbConfig{
AllowConflicts: base.BoolPtr(true),
},
},
}

btcRunner := NewBlipTesterClientRunner(t)
const docID = "doc"

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTester(t, rtConfig)
defer rt.Close()

opts := BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols}
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &opts)
defer btc.Close()

btcRunner.StartPull(btc.id)

// Write an initial rev with attachment data
initialVersion := btc.rt.PutDoc(docID, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)

// Replicate data to client and ensure doc arrives
btc.rt.WaitForPendingChanges()
btcRunner.WaitForVersion(btc.id, docID, initialVersion)

// Create a set of revisions before we start the replicator to ensure there's a significant amount of history to push
version := initialVersion
for i := 0; i < 25; i++ {
version = btcRunner.AddRev(btc.id, docID, &version, []byte(`{"update_count":`+strconv.Itoa(i)+`,"_attachments": {"hello.txt": {"revpos":1,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`))
}

// Note this references revpos 1 and therefore SGW has it - Shouldn't need proveAttachment, even when we replicate it
proveAttachmentBefore := btc.pushReplication.replicationStats.ProveAttachment.Value()
btcRunner.StartPushWithOpts(btc.id, BlipTesterPushOptions{Continuous: false})
rt.WaitForVersion(docID, version)

proveAttachmentAfter := btc.pushReplication.replicationStats.ProveAttachment.Value()
assert.Equal(t, proveAttachmentBefore, proveAttachmentAfter)

// start another push to run in the background from where we last left off
latestSeq := btcRunner.SingleCollection(btc.id).lastSeq()
btcRunner.StartPushWithOpts(btc.id, BlipTesterPushOptions{Continuous: true, Since: strconv.Itoa(int(latestSeq))})

// Push another bunch of history, this time whilst a replicator is actively pushing them
for i := 25; i < 50; i++ {
version = btcRunner.AddRev(btc.id, docID, &version, []byte(`{"update_count":`+strconv.Itoa(i)+`,"_attachments": {"hello.txt": {"revpos":1,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`))
}

rt.WaitForVersion(docID, version)
proveAttachmentAfter = btc.pushReplication.replicationStats.ProveAttachment.Value()
assert.Equal(t, proveAttachmentBefore, proveAttachmentAfter)
})
}

func TestAttachmentWithErroneousRevPos(t *testing.T) {
rtConfig := &RestTesterConfig{
GuestEnabled: true,
Expand Down
2 changes: 2 additions & 0 deletions rest/blip_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ func proposeChangesEntryForDoc(doc *clientDoc) proposeChangeBatchEntry {
if i == 0 {
// skip current rev
continue
} else if i == 19 {
break // only send 20 history entries
}
revisionHistory = append(revisionHistory, doc._revisionsBySeq[seq].version)
}
Expand Down

0 comments on commit 8a9d534

Please sign in to comment.