Skip to content

Commit

Permalink
CBG-4541: rev cache changes merge into anemone (#7405)
Browse files Browse the repository at this point in the history
Co-authored-by: Tor Colvin <[email protected]>
  • Loading branch information
gregns1 and torcolvin authored Mar 7, 2025
1 parent 8497bc0 commit ed73f55
Show file tree
Hide file tree
Showing 9 changed files with 1,208 additions and 544 deletions.
2 changes: 1 addition & 1 deletion db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {

// Now add the entry for the new doc revision:
if len(rawUserXattr) > 0 {
collection.revisionCache.RemoveWithRev(docID, syncData.CurrentRev)
collection.revisionCache.RemoveWithRev(ctx, docID, syncData.CurrentRev)
}

change := &LogEntry{
Expand Down
28 changes: 15 additions & 13 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *DatabaseCollection) GetDocument(ctx context.Context, docid string, unma
return doc, err
}

// Lowest-level method that reads a document from the bucket
// GetDocumentWithRaw returns the document from the bucket. This may perform an on-demand import.
func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
key := realDocID(docid)
if key == "" {
Expand Down Expand Up @@ -1098,9 +1098,8 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod

docUpdateEvent := NewVersion
allowImport := db.UseXattrs()

doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, docUpdateEvent, nil, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {

updateRevCache := true
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, docUpdateEvent, nil, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
var isSgWrite bool
var crc32Match bool

Expand Down Expand Up @@ -1224,7 +1223,8 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont

docUpdateEvent := ExistingVersion
allowImport := db.UseXattrs()
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
updateRevCache := true
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)

var isSgWrite bool
Expand Down Expand Up @@ -1386,8 +1386,8 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c
}

allowImport := db.UseXattrs()
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {

updateRevCache := true
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)

var isSgWrite bool
Expand Down Expand Up @@ -2363,7 +2363,7 @@ type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAtta
// On cas failure, the document will still be reloaded from the bucket as usual.
// 3. If isImport=true, document body will not be updated - only metadata xattr(s)

func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry *uint32, opts *sgbucket.MutateInOptions, docUpdateEvent DocUpdateType, existingDoc *sgbucket.BucketDocument, isImport bool, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry *uint32, opts *sgbucket.MutateInOptions, docUpdateEvent DocUpdateType, existingDoc *sgbucket.BucketDocument, isImport bool, updateRevCache bool, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) {
key := realDocID(docid)
if key == "" {
return nil, "", base.HTTPErrorf(400, "Invalid doc ID")
Expand Down Expand Up @@ -2533,7 +2533,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

// Prior to saving doc, remove the revision in cache
if createNewRevIDSkipped {
db.revisionCache.RemoveWithRev(doc.ID, doc.CurrentRev)
db.revisionCache.RemoveWithRev(ctx, doc.ID, doc.CurrentRev)
}

base.DebugfCtx(ctx, base.KeyCRUD, "Saving doc (seq: #%d, id: %v rev: %v)", doc.Sequence, base.UD(doc.ID), doc.CurrentRev)
Expand Down Expand Up @@ -2633,10 +2633,12 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
CV: &Version{SourceID: doc.HLV.SourceID, Value: doc.HLV.Version},
}

if createNewRevIDSkipped {
db.revisionCache.Upsert(ctx, documentRevision)
} else {
db.revisionCache.Put(ctx, documentRevision)
if updateRevCache {
if createNewRevIDSkipped {
db.revisionCache.Upsert(ctx, documentRevision)
} else {
db.revisionCache.Put(ctx, documentRevision)
}
}

if db.eventMgr().HasHandlerForEvent(DocumentChange) {
Expand Down
4 changes: 3 additions & 1 deletion db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin
}

docUpdateEvent := Import
docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, docUpdateEvent, existingDoc, true, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// do not update rev cache for on-demand imports
updateRevCache := mode == ImportFromFeed
docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, docUpdateEvent, existingDoc, true, updateRevCache, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// Perform cas mismatch check first, as we want to identify cas mismatch before triggering migrate handling.
// If there's a cas mismatch, the doc has been updated since the version that triggered the import. Handling depends on import mode.
if doc.Cas != existingDoc.Cas {
Expand Down
1 change: 0 additions & 1 deletion db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ func TestImportWithCasFailureUpdate(t *testing.T) {
assert.NoError(t, err, "Error unmarshalling body")

runOnce = true

// Trigger import
_, err = collection.importDoc(ctx, testcase.docname, bodyD, nil, false, 0, existingBucketDoc, ImportOnDemand)
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions db/revision_cache_bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ func (rc *BypassRevisionCache) Upsert(ctx context.Context, docRev DocumentRevisi
// no-op
}

func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string, collectionID uint32) {
func (rc *BypassRevisionCache) RemoveWithRev(ctx context.Context, docID, revID string, collectionID uint32) {
// no-op
}

func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *Version, collectionID uint32) {
func (rc *BypassRevisionCache) RemoveWithCV(ctx context.Context, docID string, cv *Version, collectionID uint32) {
// no-op
}

Expand Down
17 changes: 11 additions & 6 deletions db/revision_cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type RevisionCache interface {
Upsert(ctx context.Context, docRev DocumentRevision, collectionID uint32)

// RemoveWithRev evicts a revision from the cache using its revID.
RemoveWithRev(docID, revID string, collectionID uint32)
RemoveWithRev(ctx context.Context, docID, revID string, collectionID uint32)

// RemoveWithCV evicts a revision from the cache using its current version.
RemoveWithCV(docID string, cv *Version, collectionID uint32)
RemoveWithCV(ctx context.Context, docID string, cv *Version, collectionID uint32)

// UpdateDelta stores the given toDelta value in the given rev if cached
UpdateDelta(ctx context.Context, docID, revID string, collectionID uint32, toDelta RevisionDelta)
Expand Down Expand Up @@ -168,13 +168,13 @@ func (c *collectionRevisionCache) Upsert(ctx context.Context, docRev DocumentRev
}

// RemoveWithRev is for per collection access to Remove method
func (c *collectionRevisionCache) RemoveWithRev(docID, revID string) {
(*c.revCache).RemoveWithRev(docID, revID, c.collectionID)
func (c *collectionRevisionCache) RemoveWithRev(ctx context.Context, docID, revID string) {
(*c.revCache).RemoveWithRev(ctx, docID, revID, c.collectionID)
}

// RemoveWithCV is for per collection access to Remove method
func (c *collectionRevisionCache) RemoveWithCV(docID string, cv *Version) {
(*c.revCache).RemoveWithCV(docID, cv, c.collectionID)
func (c *collectionRevisionCache) RemoveWithCV(ctx context.Context, docID string, cv *Version) {
(*c.revCache).RemoveWithCV(ctx, docID, cv, c.collectionID)
}

// UpdateDelta is for per collection access to UpdateDelta method
Expand Down Expand Up @@ -457,6 +457,11 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache
channels = doc.SyncData.getCurrentChannels()
revid = doc.CurrentRev
hlv = doc.HLV
validatedHistory, getHistoryErr := doc.History.getHistory(revid)
if getHistoryErr != nil {
return bodyBytes, history, channels, removed, attachments, deleted, doc.Expiry, revid, hlv, err
}
history = encodeRevisions(ctx, doc.ID, validatedHistory)

return bodyBytes, history, channels, removed, attachments, deleted, doc.Expiry, revid, hlv, err
}
Expand Down
Loading

0 comments on commit ed73f55

Please sign in to comment.