From ed73f5530412e6fc97fe6c7c4da22aedf7b5748f Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Fri, 7 Mar 2025 03:03:19 +0000 Subject: [PATCH] CBG-4541: rev cache changes merge into anemone (#7405) Co-authored-by: Tor Colvin --- db/change_cache.go | 2 +- db/crud.go | 28 +- db/import.go | 4 +- db/import_test.go | 1 - db/revision_cache_bypass.go | 4 +- db/revision_cache_interface.go | 17 +- db/revision_cache_lru.go | 284 ++++--- db/revision_cache_test.go | 1405 ++++++++++++++++++++++---------- rest/importtest/import_test.go | 7 +- 9 files changed, 1208 insertions(+), 544 deletions(-) diff --git a/db/change_cache.go b/db/change_cache.go index d3265a9cbc..2e7298a561 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -509,7 +509,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { // Now add the entry for the new doc revision: if len(rawUserXattr) > 0 { - collection.revisionCache.RemoveWithRev(docID, syncData.CurrentRev) + collection.revisionCache.RemoveWithRev(ctx, docID, syncData.CurrentRev) } change := &LogEntry{ diff --git a/db/crud.go b/db/crud.go index 976b0b7eb8..a1660afb38 100644 --- a/db/crud.go +++ b/db/crud.go @@ -64,7 +64,7 @@ func (c *DatabaseCollection) GetDocument(ctx context.Context, docid string, unma return doc, err } -// Lowest-level method that reads a document from the bucket +// GetDocumentWithRaw returns the document from the bucket. This may perform an on-demand import. func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) { key := realDocID(docid) if key == "" { @@ -1098,9 +1098,8 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod docUpdateEvent := NewVersion allowImport := db.UseXattrs() - - doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, docUpdateEvent, nil, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { - + updateRevCache := true + doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &expiry, nil, docUpdateEvent, nil, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { var isSgWrite bool var crc32Match bool @@ -1224,7 +1223,8 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont docUpdateEvent := ExistingVersion allowImport := db.UseXattrs() - doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { + updateRevCache := true + doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { // (Be careful: this block can be invoked multiple times if there are races!) var isSgWrite bool @@ -1386,8 +1386,8 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c } allowImport := db.UseXattrs() - doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { - + updateRevCache := true + doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, &newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, false, updateRevCache, func(doc *Document) (resultDoc *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { // (Be careful: this block can be invoked multiple times if there are races!) var isSgWrite bool @@ -2363,7 +2363,7 @@ type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAtta // On cas failure, the document will still be reloaded from the bucket as usual. // 3. If isImport=true, document body will not be updated - only metadata xattr(s) -func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry *uint32, opts *sgbucket.MutateInOptions, docUpdateEvent DocUpdateType, existingDoc *sgbucket.BucketDocument, isImport bool, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) { +func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry *uint32, opts *sgbucket.MutateInOptions, docUpdateEvent DocUpdateType, existingDoc *sgbucket.BucketDocument, isImport bool, updateRevCache bool, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) { key := realDocID(docid) if key == "" { return nil, "", base.HTTPErrorf(400, "Invalid doc ID") @@ -2533,7 +2533,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // Prior to saving doc, remove the revision in cache if createNewRevIDSkipped { - db.revisionCache.RemoveWithRev(doc.ID, doc.CurrentRev) + db.revisionCache.RemoveWithRev(ctx, doc.ID, doc.CurrentRev) } base.DebugfCtx(ctx, base.KeyCRUD, "Saving doc (seq: #%d, id: %v rev: %v)", doc.Sequence, base.UD(doc.ID), doc.CurrentRev) @@ -2633,10 +2633,12 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do CV: &Version{SourceID: doc.HLV.SourceID, Value: doc.HLV.Version}, } - if createNewRevIDSkipped { - db.revisionCache.Upsert(ctx, documentRevision) - } else { - db.revisionCache.Put(ctx, documentRevision) + if updateRevCache { + if createNewRevIDSkipped { + db.revisionCache.Upsert(ctx, documentRevision) + } else { + db.revisionCache.Put(ctx, documentRevision) + } } if db.eventMgr().HasHandlerForEvent(DocumentChange) { diff --git a/db/import.go b/db/import.go index e89003b310..3502a3f0d2 100644 --- a/db/import.go +++ b/db/import.go @@ -154,7 +154,9 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin } docUpdateEvent := Import - docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, docUpdateEvent, existingDoc, true, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { + // do not update rev cache for on-demand imports + updateRevCache := mode == ImportFromFeed + docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, docUpdateEvent, existingDoc, true, updateRevCache, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { // Perform cas mismatch check first, as we want to identify cas mismatch before triggering migrate handling. // If there's a cas mismatch, the doc has been updated since the version that triggered the import. Handling depends on import mode. if doc.Cas != existingDoc.Cas { diff --git a/db/import_test.go b/db/import_test.go index 1bca6a80a4..1b2a733dd8 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -545,7 +545,6 @@ func TestImportWithCasFailureUpdate(t *testing.T) { assert.NoError(t, err, "Error unmarshalling body") runOnce = true - // Trigger import _, err = collection.importDoc(ctx, testcase.docname, bodyD, nil, false, 0, existingBucketDoc, ImportOnDemand) assert.NoError(t, err) diff --git a/db/revision_cache_bypass.go b/db/revision_cache_bypass.go index 036266960a..38fcc20b60 100644 --- a/db/revision_cache_bypass.go +++ b/db/revision_cache_bypass.go @@ -124,11 +124,11 @@ func (rc *BypassRevisionCache) Upsert(ctx context.Context, docRev DocumentRevisi // no-op } -func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string, collectionID uint32) { +func (rc *BypassRevisionCache) RemoveWithRev(ctx context.Context, docID, revID string, collectionID uint32) { // no-op } -func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *Version, collectionID uint32) { +func (rc *BypassRevisionCache) RemoveWithCV(ctx context.Context, docID string, cv *Version, collectionID uint32) { // no-op } diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index 6a71b2ed8e..940c395bd0 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -50,10 +50,10 @@ type RevisionCache interface { Upsert(ctx context.Context, docRev DocumentRevision, collectionID uint32) // RemoveWithRev evicts a revision from the cache using its revID. - RemoveWithRev(docID, revID string, collectionID uint32) + RemoveWithRev(ctx context.Context, docID, revID string, collectionID uint32) // RemoveWithCV evicts a revision from the cache using its current version. - RemoveWithCV(docID string, cv *Version, collectionID uint32) + RemoveWithCV(ctx context.Context, docID string, cv *Version, collectionID uint32) // UpdateDelta stores the given toDelta value in the given rev if cached UpdateDelta(ctx context.Context, docID, revID string, collectionID uint32, toDelta RevisionDelta) @@ -168,13 +168,13 @@ func (c *collectionRevisionCache) Upsert(ctx context.Context, docRev DocumentRev } // RemoveWithRev is for per collection access to Remove method -func (c *collectionRevisionCache) RemoveWithRev(docID, revID string) { - (*c.revCache).RemoveWithRev(docID, revID, c.collectionID) +func (c *collectionRevisionCache) RemoveWithRev(ctx context.Context, docID, revID string) { + (*c.revCache).RemoveWithRev(ctx, docID, revID, c.collectionID) } // RemoveWithCV is for per collection access to Remove method -func (c *collectionRevisionCache) RemoveWithCV(docID string, cv *Version) { - (*c.revCache).RemoveWithCV(docID, cv, c.collectionID) +func (c *collectionRevisionCache) RemoveWithCV(ctx context.Context, docID string, cv *Version) { + (*c.revCache).RemoveWithCV(ctx, docID, cv, c.collectionID) } // UpdateDelta is for per collection access to UpdateDelta method @@ -457,6 +457,11 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache channels = doc.SyncData.getCurrentChannels() revid = doc.CurrentRev hlv = doc.HLV + validatedHistory, getHistoryErr := doc.History.getHistory(revid) + if getHistoryErr != nil { + return bodyBytes, history, channels, removed, attachments, deleted, doc.Expiry, revid, hlv, err + } + history = encodeRevisions(ctx, doc.ID, validatedHistory) return bodyBytes, history, channels, removed, attachments, deleted, doc.Expiry, revid, hlv, err } diff --git a/db/revision_cache_lru.go b/db/revision_cache_lru.go index 2e9eda7d44..dc3336d083 100644 --- a/db/revision_cache_lru.go +++ b/db/revision_cache_lru.go @@ -14,6 +14,7 @@ import ( "container/list" "context" "sync" + "sync/atomic" "time" sgbucket "github.com/couchbase/sg-bucket" @@ -84,12 +85,12 @@ func (sc *ShardedLRURevisionCache) Upsert(ctx context.Context, docRev DocumentRe sc.getShard(docRev.DocID).Upsert(ctx, docRev, collectionID) } -func (sc *ShardedLRURevisionCache) RemoveWithRev(docID, revID string, collectionID uint32) { - sc.getShard(docID).RemoveWithRev(docID, revID, collectionID) +func (sc *ShardedLRURevisionCache) RemoveWithRev(ctx context.Context, docID, revID string, collectionID uint32) { + sc.getShard(docID).RemoveWithRev(ctx, docID, revID, collectionID) } -func (sc *ShardedLRURevisionCache) RemoveWithCV(docID string, cv *Version, collectionID uint32) { - sc.getShard(docID).RemoveWithCV(docID, cv, collectionID) +func (sc *ShardedLRURevisionCache) RemoveWithCV(ctx context.Context, docID string, cv *Version, collectionID uint32) { + sc.getShard(docID).RemoveWithCV(ctx, docID, cv, collectionID) } // An LRU cache of document revision bodies, together with their channel access. @@ -124,8 +125,9 @@ type revCacheValue struct { lock sync.RWMutex deleted bool removed bool - itemBytes int64 + itemBytes atomic.Int64 collectionID uint32 + canEvict atomic.Bool } // Creates a revision cache with the given capacity and an optional loader function. @@ -170,26 +172,33 @@ func (rc *LRURevisionCache) Peek(ctx context.Context, docID, revID string, colle // Attempt to update the delta on a revision cache entry. If the entry is no longer resident in the cache, // fails silently func (rc *LRURevisionCache) UpdateDelta(ctx context.Context, docID, revID string, collectionID uint32, toDelta RevisionDelta) { - value := rc.getValue(docID, revID, collectionID, false) + value := rc.getValue(ctx, docID, revID, collectionID, false) if value != nil { outGoingBytes := value.updateDelta(toDelta) if outGoingBytes != 0 { - rc.updateRevCacheMemoryUsage(outGoingBytes) + rc.currMemoryUsage.Add(outGoingBytes) + rc.cacheMemoryBytesStat.Add(outGoingBytes) } // check for memory based eviction - rc.revCacheMemoryBasedEviction() + rc.revCacheMemoryBasedEviction(ctx) } } func (rc *LRURevisionCache) UpdateDeltaCV(ctx context.Context, docID string, cv *Version, collectionID uint32, toDelta RevisionDelta) { - value := rc.getValueByCV(docID, cv, collectionID, false) + value := rc.getValueByCV(ctx, docID, cv, collectionID, false) if value != nil { - value.updateDelta(toDelta) + outGoingBytes := value.updateDelta(toDelta) + if outGoingBytes != 0 { + rc.currMemoryUsage.Add(outGoingBytes) + rc.cacheMemoryBytesStat.Add(outGoingBytes) + } + // check for memory based eviction + rc.revCacheMemoryBasedEviction(ctx) } } func (rc *LRURevisionCache) getFromCacheByRev(ctx context.Context, docID, revID string, collectionID uint32, loadOnCacheMiss, includeDelta bool) (DocumentRevision, error) { - value := rc.getValue(docID, revID, collectionID, loadOnCacheMiss) + value := rc.getValue(ctx, docID, revID, collectionID, loadOnCacheMiss) if value == nil { return DocumentRevision{}, nil } @@ -199,10 +208,10 @@ func (rc *LRURevisionCache) getFromCacheByRev(ctx context.Context, docID, revID if !statEvent && err == nil { // cache miss so we had to load doc, increment memory count - rc.updateRevCacheMemoryUsage(value.getItemBytes()) + rc.incrRevCacheMemoryUsage(ctx, value.getItemBytes()) // check for memory based eviction - rc.revCacheMemoryBasedEviction() - rc.addToHLVMapPostLoad(docID, docRev.RevID, docRev.CV) + rc.revCacheMemoryBasedEviction(ctx) + rc.addToHLVMapPostLoad(docID, docRev.RevID, docRev.CV, collectionID) } if err != nil { @@ -213,7 +222,7 @@ func (rc *LRURevisionCache) getFromCacheByRev(ctx context.Context, docID, revID } func (rc *LRURevisionCache) getFromCacheByCV(ctx context.Context, docID string, cv *Version, collectionID uint32, loadCacheOnMiss bool, includeDelta bool) (DocumentRevision, error) { - value := rc.getValueByCV(docID, cv, collectionID, loadCacheOnMiss) + value := rc.getValueByCV(ctx, docID, cv, collectionID, loadCacheOnMiss) if value == nil { return DocumentRevision{}, nil } @@ -226,6 +235,10 @@ func (rc *LRURevisionCache) getFromCacheByCV(ctx context.Context, docID string, } if !cacheHit && err == nil { + // cache miss so we had to load doc, increment memory count + rc.incrRevCacheMemoryUsage(ctx, value.getItemBytes()) + // check for memory based eviction + rc.revCacheMemoryBasedEviction(ctx) rc.addToRevMapPostLoad(docID, docRev.RevID, docRev.CV, collectionID) } @@ -250,23 +263,23 @@ func (rc *LRURevisionCache) GetActive(ctx context.Context, docID string, collect } // Retrieve from or add to rev cache - value := rc.getValue(docID, bucketDoc.CurrentRev, collectionID, true) + value := rc.getValue(ctx, docID, bucketDoc.CurrentRev, collectionID, true) docRev, statEvent, err := value.loadForDoc(ctx, rc.backingStores[collectionID], bucketDoc) rc.statsRecorderFunc(statEvent) if !statEvent && err == nil { // cache miss so we had to load doc, increment memory count - rc.updateRevCacheMemoryUsage(value.getItemBytes()) + rc.incrRevCacheMemoryUsage(ctx, value.getItemBytes()) // check for rev cache memory based eviction - rc.revCacheMemoryBasedEviction() + rc.revCacheMemoryBasedEviction(ctx) } if err != nil { rc.removeValue(value) // don't keep failed loads in the cache } else { // add successfully fetched value to CV lookup map too - rc.addToHLVMapPostLoad(docID, docRev.RevID, docRev.CV) + rc.addToHLVMapPostLoad(docID, docRev.RevID, docRev.CV, collectionID) } return docRev, err @@ -290,17 +303,17 @@ func (rc *LRURevisionCache) Put(ctx context.Context, docRev DocumentRevision, co // doc should always have a cv present in a PUT operation on the cache (update HLV is called before hand in doc update process) // thus we can call getValueByCV directly the update the rev lookup post this - value := rc.getValueByCV(docRev.DocID, docRev.CV, collectionID, true) + value := rc.getValueByCV(ctx, docRev.DocID, docRev.CV, collectionID, true) // increment incoming bytes docRev.CalculateBytes() - rc.updateRevCacheMemoryUsage(docRev.MemoryBytes) + rc.incrRevCacheMemoryUsage(ctx, docRev.MemoryBytes) value.store(docRev) // add new doc version to the rev id lookup map rc.addToRevMapPostLoad(docRev.DocID, docRev.RevID, docRev.CV, collectionID) // check for rev cache memory based eviction - rc.revCacheMemoryBasedEviction() + rc.revCacheMemoryBasedEviction(ctx) } // Upsert a revision in the cache. @@ -323,7 +336,7 @@ func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision, if found { revItem := existingElem.Value.(*revCacheValue) // decrement item bytes by the removed item - rc.updateRevCacheMemoryUsage(-revItem.getItemBytes()) + rc._decrRevCacheMemoryUsage(ctx, -revItem.getItemBytes()) rc.lruList.Remove(existingElem) newItem = false } @@ -341,35 +354,25 @@ func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision, } // Purge oldest item if over number capacity - var numItemsRemoved int - for len(rc.cache) > int(rc.capacity) { - rc.purgeOldest_() - numItemsRemoved++ + numItemsRemoved, numBytesEvicted := rc._numberCapacityEviction() + if numBytesEvicted > 0 { + rc._decrRevCacheMemoryUsage(ctx, -numBytesEvicted) } + rc.lock.Unlock() // release lock after eviction finished if numItemsRemoved > 0 { - rc.cacheNumItems.Add(int64(-numItemsRemoved)) + rc.cacheNumItems.Add(-numItemsRemoved) } docRev.CalculateBytes() // add new item bytes to overall count - rc.updateRevCacheMemoryUsage(docRev.MemoryBytes) - + rc.incrRevCacheMemoryUsage(ctx, docRev.MemoryBytes) value.store(docRev) // check we aren't over memory capacity, if so perform eviction - numItemsRemoved = 0 - if rc.memoryCapacity > 0 { - for rc.currMemoryUsage.Value() > rc.memoryCapacity { - rc.purgeOldest_() - numItemsRemoved++ - } - rc.cacheNumItems.Add(int64(-numItemsRemoved)) - } - - rc.lock.Unlock() + rc.revCacheMemoryBasedEviction(ctx) } -func (rc *LRURevisionCache) getValue(docID, revID string, collectionID uint32, create bool) (value *revCacheValue) { +func (rc *LRURevisionCache) getValue(ctx context.Context, docID, revID string, collectionID uint32, create bool) (value *revCacheValue) { if docID == "" || revID == "" { // TODO: CBG-1948 panic("RevisionCache: invalid empty doc/rev id") @@ -381,26 +384,26 @@ func (rc *LRURevisionCache) getValue(docID, revID string, collectionID uint32, c value = elem.Value.(*revCacheValue) } else if create { value = &revCacheValue{id: docID, revID: revID, collectionID: collectionID} + value.canEvict.Store(false) rc.cache[key] = rc.lruList.PushFront(value) rc.cacheNumItems.Add(1) - // evict if over number capacity - var numItemsRemoved int - for rc.lruList.Len() > int(rc.capacity) { - rc.purgeOldest_() - numItemsRemoved++ + numItemsRemoved, numBytesEvicted := rc._numberCapacityEviction() + if numBytesEvicted > 0 { + rc._decrRevCacheMemoryUsage(ctx, -numBytesEvicted) } + rc.lock.Unlock() // release lock as eviction is finished if numItemsRemoved > 0 { - rc.cacheNumItems.Add(int64(-numItemsRemoved)) + rc.cacheNumItems.Add(-numItemsRemoved) } + // return early as rev cache mutex has been released at this point + return } rc.lock.Unlock() return } -// getValueByCV gets a value from rev cache by CV, if not found and create is true, will add the value to cache and both lookup maps - -func (rc *LRURevisionCache) getValueByCV(docID string, cv *Version, collectionID uint32, create bool) (value *revCacheValue) { +func (rc *LRURevisionCache) getValueByCV(ctx context.Context, docID string, cv *Version, collectionID uint32, create bool) (value *revCacheValue) { if docID == "" || cv == nil { return nil } @@ -412,20 +415,23 @@ func (rc *LRURevisionCache) getValueByCV(docID string, cv *Version, collectionID value = elem.Value.(*revCacheValue) } else if create { value = &revCacheValue{id: docID, cv: *cv, collectionID: collectionID} + value.canEvict.Store(false) newElem := rc.lruList.PushFront(value) rc.hlvCache[key] = newElem rc.cacheNumItems.Add(1) // evict if over number capacity - var numItemsRemoved int - for rc.lruList.Len() > int(rc.capacity) { - rc.purgeOldest_() - numItemsRemoved++ + numItemsRemoved, numBytesEvicted := rc._numberCapacityEviction() + if numBytesEvicted > 0 { + rc._decrRevCacheMemoryUsage(ctx, -numBytesEvicted) } + rc.lock.Unlock() // release lock as eviction is finished if numItemsRemoved > 0 { - rc.cacheNumItems.Add(int64(-numItemsRemoved)) + rc.cacheNumItems.Add(-numItemsRemoved) } + // return early as rev cache mutex has been released at this point + return } rc.lock.Unlock() return @@ -462,9 +468,9 @@ func (rc *LRURevisionCache) addToRevMapPostLoad(docID, revID string, cv *Version } // addToHLVMapPostLoad will generate and entry in the CV lookup map for a new document entering the cache -func (rc *LRURevisionCache) addToHLVMapPostLoad(docID, revID string, cv *Version) { - legacyKey := IDAndRev{DocID: docID, RevID: revID} - key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Value} +func (rc *LRURevisionCache) addToHLVMapPostLoad(docID, revID string, cv *Version, collectionID uint32) { + legacyKey := IDAndRev{DocID: docID, RevID: revID, CollectionID: collectionID} + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Value, CollectionID: collectionID} rc.lock.Lock() defer rc.lock.Unlock() @@ -485,21 +491,24 @@ func (rc *LRURevisionCache) addToHLVMapPostLoad(docID, revID string, cv *Version // if CV map and rev map are targeting different list elements, update to have both use the cv map element rc.cache[legacyKey] = cvElem rc.lruList.Remove(revElem) + } else { + // if not found we need to add the element to the hlv lookup + rc.hlvCache[key] = revElem } } // Remove removes a value from the revision cache, if present. -func (rc *LRURevisionCache) RemoveWithRev(docID, revID string, collectionID uint32) { - rc.removeFromCacheByRev(docID, revID, collectionID) +func (rc *LRURevisionCache) RemoveWithRev(ctx context.Context, docID, revID string, collectionID uint32) { + rc.removeFromCacheByRev(ctx, docID, revID, collectionID) } // RemoveWithCV removes a value from rev cache by CV reference if present -func (rc *LRURevisionCache) RemoveWithCV(docID string, cv *Version, collectionID uint32) { - rc.removeFromCacheByCV(docID, cv, collectionID) +func (rc *LRURevisionCache) RemoveWithCV(ctx context.Context, docID string, cv *Version, collectionID uint32) { + rc.removeFromCacheByCV(ctx, docID, cv, collectionID) } // removeFromCacheByCV removes an entry from rev cache by CV -func (rc *LRURevisionCache) removeFromCacheByCV(docID string, cv *Version, collectionID uint32) { +func (rc *LRURevisionCache) removeFromCacheByCV(ctx context.Context, docID string, cv *Version, collectionID uint32) { key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Value, CollectionID: collectionID} rc.lock.Lock() defer rc.lock.Unlock() @@ -509,15 +518,17 @@ func (rc *LRURevisionCache) removeFromCacheByCV(docID string, cv *Version, colle } // grab the revid key from the value to enable us to remove the reference from the rev lookup map too elem := element.Value.(*revCacheValue) - legacyKey := IDAndRev{DocID: docID, RevID: elem.revID} + legacyKey := IDAndRev{DocID: docID, RevID: elem.revID, CollectionID: collectionID} rc.lruList.Remove(element) delete(rc.hlvCache, key) // remove from rev lookup map too delete(rc.cache, legacyKey) + rc._decrRevCacheMemoryUsage(ctx, -elem.getItemBytes()) + rc.cacheNumItems.Add(-1) } // removeFromCacheByRev removes an entry from rev cache by revID -func (rc *LRURevisionCache) removeFromCacheByRev(docID, revID string, collectionID uint32) { +func (rc *LRURevisionCache) removeFromCacheByRev(ctx context.Context, docID, revID string, collectionID uint32) { key := IDAndRev{DocID: docID, RevID: revID, CollectionID: collectionID} rc.lock.Lock() defer rc.lock.Unlock() @@ -527,15 +538,14 @@ func (rc *LRURevisionCache) removeFromCacheByRev(docID, revID string, collection } // grab the cv key from the value to enable us to remove the reference from the rev lookup map too elem := element.Value.(*revCacheValue) - hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.Value} + hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.Value, CollectionID: collectionID} rc.lruList.Remove(element) // decrement the overall memory bytes count - revItem := element.Value.(*revCacheValue) - rc.updateRevCacheMemoryUsage(-revItem.getItemBytes()) + rc._decrRevCacheMemoryUsage(ctx, -elem.getItemBytes()) delete(rc.cache, key) - rc.cacheNumItems.Add(-1) // remove from CV lookup map too delete(rc.hlvCache, hlvKey) + rc.cacheNumItems.Add(-1) } // removeValue removes a value from the revision cache, if present and the value matches the the value. If there's an item in the revision cache with a matching docID and revID but the document is different, this item will not be removed from the rev cache. @@ -562,14 +572,23 @@ func (rc *LRURevisionCache) removeValue(value *revCacheValue) { } } -func (rc *LRURevisionCache) purgeOldest_() { - value := rc.lruList.Remove(rc.lruList.Back()).(*revCacheValue) - revKey := IDAndRev{DocID: value.id, RevID: value.revID, CollectionID: value.collectionID} - hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Value, CollectionID: value.collectionID} - delete(rc.cache, revKey) - delete(rc.hlvCache, hlvKey) - // decrement memory overall size - rc.updateRevCacheMemoryUsage(-value.getItemBytes()) +// _numberCapacityEviction will iterate removing the last element in cache til we fall below the maximum number of items +// threshold for this shard, retuning the bytes evicted and number of items evicted +func (rc *LRURevisionCache) _numberCapacityEviction() (numItemsEvicted int64, numBytesEvicted int64) { + for rc.lruList.Len() > int(rc.capacity) { + value := rc._findEvictionValue() + if value == nil { + // no more ready for eviction + break + } + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Value, CollectionID: value.collectionID} + revKey := IDAndRev{DocID: value.id, RevID: value.revID, CollectionID: value.collectionID} + delete(rc.cache, revKey) + delete(rc.hlvCache, hlvKey) + numItemsEvicted++ + numBytesEvicted += value.getItemBytes() + } + return numItemsEvicted, numBytesEvicted } // Gets the body etc. out of a revCacheValue. If they aren't present already, the loader func @@ -601,6 +620,12 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache if value.bodyBytes != nil || value.err != nil { cacheHit = true } else { + // we only want to set can evict if we are having to load the doc from the bucket into value, + // avoiding setting this value multiple times in the case other goroutines are loading the same value + defer func() { + value.canEvict.Store(true) // once done loading doc we can set the value to be ready for eviction + }() + cacheHit = false hlv := &HybridLogicalVector{} if value.revID == "" { @@ -630,7 +655,7 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache // if not cache hit, we loaded from bucket. Calculate doc rev size and assign to rev cache value if !cacheHit { docRev.CalculateBytes() - value.itemBytes = docRev.MemoryBytes + value.itemBytes.Store(docRev.MemoryBytes) } value.lock.Unlock() @@ -678,6 +703,12 @@ func (value *revCacheValue) loadForDoc(ctx context.Context, backingStore Revisio if value.bodyBytes != nil || value.err != nil { cacheHit = true } else { + // we only want to set can evict if we are having to load the doc from the bucket into value, + // avoiding setting this value multiple times in the case other goroutines are loading the same value + defer func() { + value.canEvict.Store(true) // once done loading doc we can set the value to be ready for eviction + }() + cacheHit = false hlv := &HybridLogicalVector{} if value.revID == "" { @@ -695,7 +726,7 @@ func (value *revCacheValue) loadForDoc(ctx context.Context, backingStore Revisio // if not cache hit, we loaded from bucket. Calculate doc rev size and assign to rev cache value if !cacheHit { docRev.CalculateBytes() - value.itemBytes = docRev.MemoryBytes + value.itemBytes.Store(docRev.MemoryBytes) } value.lock.Unlock() return docRev, cacheHit, err @@ -706,6 +737,7 @@ func (value *revCacheValue) store(docRev DocumentRevision) { value.lock.Lock() if value.bodyBytes == nil { value.revID = docRev.RevID + value.id = docRev.DocID value.bodyBytes = docRev.BodyBytes value.history = docRev.History value.channels = docRev.Channels @@ -713,10 +745,11 @@ func (value *revCacheValue) store(docRev DocumentRevision) { value.attachments = docRev.Attachments.ShallowCopy() // Don't store attachments the caller might later mutate value.deleted = docRev.Deleted value.err = nil - value.itemBytes = docRev.MemoryBytes + value.itemBytes.Store(docRev.MemoryBytes) value.hlvHistory = docRev.hlvHistory } value.lock.Unlock() + value.canEvict.Store(true) // now we have stored the doc revision in the cache, we can allow eviction } func (value *revCacheValue) updateDelta(toDelta RevisionDelta) (diffInBytes int64) { @@ -729,17 +762,15 @@ func (value *revCacheValue) updateDelta(toDelta RevisionDelta) (diffInBytes int6 diffInBytes = toDelta.totalDeltaBytes - previousDeltaBytes value.delta = &toDelta if diffInBytes != 0 { - value.itemBytes += diffInBytes + value.itemBytes.Add(diffInBytes) } value.lock.Unlock() return diffInBytes } -// getItemBytes acquires read lock and retrieves the rev cache items overall memory footprint +// getItemBytes atomically retrieves the rev cache items overall memory footprint func (value *revCacheValue) getItemBytes() int64 { - value.lock.RLock() - defer value.lock.RUnlock() - return value.itemBytes + return value.itemBytes.Load() } // CalculateBytes will calculate the bytes from revisions in the document, body and channels on the document @@ -778,31 +809,92 @@ func (delta *RevisionDelta) CalculateDeltaBytes() { } // revCacheMemoryBasedEviction checks for rev cache eviction, if required calls performEviction which will acquire lock to evict -func (rc *LRURevisionCache) revCacheMemoryBasedEviction() { +func (rc *LRURevisionCache) revCacheMemoryBasedEviction(ctx context.Context) { // if memory capacity is not set, don't check for eviction this way if rc.memoryCapacity > 0 && rc.currMemoryUsage.Value() > rc.memoryCapacity { - rc.performEviction() + rc.performEviction(ctx) } } // performEviction will evict the oldest items in the cache till we are below the memory threshold -func (rc *LRURevisionCache) performEviction() { - rc.lock.Lock() - defer rc.lock.Unlock() - var numItemsRemoved int64 - for rc.currMemoryUsage.Value() > rc.memoryCapacity { - rc.purgeOldest_() - numItemsRemoved++ +func (rc *LRURevisionCache) performEviction(ctx context.Context) { + var numItemsRemoved, numBytesRemoved int64 + rc.lock.Lock() // hold rev cache lock to remove items from cache until we're below memory threshold for the shard + // check if we are over memory capacity after holding rev cache mutex (protect against another goroutine evicting whilst waiting for mutex above) + if currMemoryUsage := rc.currMemoryUsage.Value(); currMemoryUsage > rc.memoryCapacity { + // find amount of bytes needed to evict till below threshold + bytesNeededToRemove := currMemoryUsage - rc.memoryCapacity + for bytesNeededToRemove > numBytesRemoved { + value := rc._findEvictionValue() + if value == nil { + // no more values ready for eviction + break + } + revKey := IDAndRev{DocID: value.id, RevID: value.revID, CollectionID: value.collectionID} + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.Value, CollectionID: value.collectionID} + delete(rc.cache, revKey) + delete(rc.hlvCache, hlvKey) + numItemsRemoved++ + valueBytes := value.getItemBytes() + numBytesRemoved += valueBytes + } } + rc._decrRevCacheMemoryUsage(ctx, -numBytesRemoved) // need update rev cache memory stats before release lock to stop other goroutines evicting based on outdated stats + rc.lock.Unlock() // release lock after removing items from cache rc.cacheNumItems.Add(-numItemsRemoved) } -// updateRevCacheMemoryUsage atomically increases overall memory usage for cache and the actual rev cache objects usage -func (rc *LRURevisionCache) updateRevCacheMemoryUsage(bytesCount int64) { +// _decrRevCacheMemoryUsage atomically decreases overall memory usage for cache and the actual rev cache objects usage. +// You should be holding rev cache lock in using this function to avoid eviction processes over evicting items +func (rc *LRURevisionCache) _decrRevCacheMemoryUsage(ctx context.Context, bytesCount int64) { + // We need to keep track of the current LRURevisionCache memory usage AND the overall usage of the cache. We need + // overall memory usage for the stat added to show rev cache usage plus we need the current rev cache capacity of the + // LRURevisionCache object for sharding the rev cache. This way we can perform eviction on per shard basis much like + // we do with the number of items capacity eviction + if bytesCount > 0 { + // function is for decrementing memory usage, so return if incrementing + base.AssertfCtx(ctx, "Attempting to increment memory stats with inside decrement function, should use incrRevCacheMemoryUsage instead") + return + } + rc.currMemoryUsage.Add(bytesCount) + rc.cacheMemoryBytesStat.Add(bytesCount) +} + +// incrRevCacheMemoryUsage atomically increases overall memory usage for cache and the actual rev cache objects usage. +// You do not need to hold rev cache lock when using this function +func (rc *LRURevisionCache) incrRevCacheMemoryUsage(ctx context.Context, bytesCount int64) { // We need to keep track of the current LRURevisionCache memory usage AND the overall usage of the cache. We need // overall memory usage for the stat added to show rev cache usage plus we need the current rev cache capacity of the // LRURevisionCache object for sharding the rev cache. This way we can perform eviction on per shard basis much like // we do with the number of items capacity eviction + if bytesCount < 0 { + // function is for incrementing memory usage, so return if decrementing + base.AssertfCtx(ctx, "Attempting to decrement memory stats with inside increment function, should use _decrRevCacheMemoryUsage instead whilst holding rev cache lock") + return + } rc.currMemoryUsage.Add(bytesCount) rc.cacheMemoryBytesStat.Add(bytesCount) } + +func (rc *LRURevisionCache) _findEvictionValue() *revCacheValue { + evictionCandidate := rc.lruList.Back() + revItem := evictionCandidate.Value.(*revCacheValue) + + if revItem.canEvict.Load() { + rc.lruList.Remove(evictionCandidate) + return revItem + } + + // iterate through list backwards to find value ready for eviction + evictionCandidate = evictionCandidate.Prev() + for evictionCandidate != nil { + revItem = evictionCandidate.Value.(*revCacheValue) + if revItem.canEvict.Load() { + rc.lruList.Remove(evictionCandidate) + return revItem + } + // check prev value + evictionCandidate = evictionCandidate.Prev() + } + return nil +} diff --git a/db/revision_cache_test.go b/db/revision_cache_test.go index 18305247d5..ae1dbcdad4 100644 --- a/db/revision_cache_test.go +++ b/db/revision_cache_test.go @@ -13,6 +13,7 @@ package db import ( "context" "fmt" + "log" "math/rand" "strconv" "sync" @@ -137,6 +138,8 @@ func TestLRURevisionCacheEviction(t *testing.T) { } assert.Equal(t, int64(10), cacheNumItems.Value()) assert.Equal(t, int64(20), memoryBytesCounted.Value()) + assert.Equal(t, 10, len(cache.cache)) + assert.Equal(t, 10, len(cache.hlvCache)) // Get them back out for i := 0; i < 10; i++ { @@ -150,6 +153,8 @@ func TestLRURevisionCacheEviction(t *testing.T) { } assert.Equal(t, int64(10), cacheNumItems.Value()) assert.Equal(t, int64(20), memoryBytesCounted.Value()) + assert.Equal(t, 10, len(cache.cache)) + assert.Equal(t, 10, len(cache.hlvCache)) // Add 3 more docs to the now full revcache for i := 10; i < 13; i++ { @@ -159,6 +164,8 @@ func TestLRURevisionCacheEviction(t *testing.T) { } assert.Equal(t, int64(10), cacheNumItems.Value()) assert.Equal(t, int64(20), memoryBytesCounted.Value()) + assert.Equal(t, 10, len(cache.cache)) + assert.Equal(t, 10, len(cache.hlvCache)) // Check that the first 3 docs were evicted prevCacheHitCount := cacheHitCounter.Value() @@ -172,6 +179,8 @@ func TestLRURevisionCacheEviction(t *testing.T) { } assert.Equal(t, int64(10), cacheNumItems.Value()) assert.Equal(t, int64(20), memoryBytesCounted.Value()) + assert.Equal(t, 10, len(cache.cache)) + assert.Equal(t, 10, len(cache.hlvCache)) // and check we can Get up to and including the last 3 we put in for i := 0; i < 10; i++ { @@ -259,132 +268,198 @@ func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { } func TestLRURevisionCacheEvictionMemoryBased(t *testing.T) { - dbcOptions := DatabaseContextOptions{ - RevisionCacheOptions: &RevisionCacheOptions{ - MaxBytes: 725, - MaxItemCount: 10, + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, }, } - db, ctx := SetupTestDBWithOptions(t, dbcOptions) - defer db.Close(ctx) - collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - cacheStats := db.DbStats.Cache() - - smallBody := Body{ - "channels": "_default", // add channel for default sync func in default collection test runs + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxBytes: 725, + MaxItemCount: 10, + }, + } + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + cacheStats := db.DbStats.Cache() + + smallBody := Body{ + "channels": "_default", // add channel for default sync func in default collection test runs + } + + var currMem, expValue, revZeroSize int64 + var rev1Version *Version + for i := 0; i < 10; i++ { + currMem = cacheStats.RevisionCacheTotalMemory.Value() + revSize, _, docVersion := createDocAndReturnSizeAndRev(t, ctx, fmt.Sprint(i), collection, smallBody) + if i == 0 { + revZeroSize = int64(revSize) + } + if i == 1 { + rev1Version = docVersion + } + expValue = currMem + int64(revSize) + assert.Equal(t, expValue, cacheStats.RevisionCacheTotalMemory.Value()) + } + + // test eviction by number of items (adding new doc from createDocAndReturnSizeAndRev shouldn't take memory over threshold defined as 730 bytes) + expValue -= revZeroSize // for doc being evicted + docSize, rev, _ := createDocAndReturnSizeAndRev(t, ctx, fmt.Sprint(11), collection, smallBody) + expValue += int64(docSize) + // assert doc 0 been evicted + docRev, ok := db.revisionCache.Peek(ctx, "0", rev, collection.GetCollectionID()) + assert.False(t, ok) + assert.Nil(t, docRev.BodyBytes) + + currMem = cacheStats.RevisionCacheTotalMemory.Value() + // assert total memory is as expected + assert.Equal(t, expValue, currMem) + + // remove doc "1" to give headroom for memory based eviction + if testCase.UseCVCache { + db.revisionCache.RemoveWithCV(ctx, "1", rev1Version, collection.GetCollectionID()) + } else { + db.revisionCache.RemoveWithRev(ctx, "1", rev, collection.GetCollectionID()) + } + docRev, ok = db.revisionCache.Peek(ctx, "1", rev, collection.GetCollectionID()) + assert.False(t, ok) + assert.Nil(t, docRev.BodyBytes) + + // assert current memory from rev cache decreases by the doc size (all docs added thus far are same size) + afterRemoval := currMem - int64(docSize) + assert.Equal(t, afterRemoval, cacheStats.RevisionCacheTotalMemory.Value()) + + // add new doc that will trigger eviction due to taking over memory size + largeBody := Body{ + "type": "test", + "doc": "testDocument", + "foo": "bar", + "lets": "test", + "larger": "document", + "for": "eviction", + "channels": "_default", // add channel for default sync func in default collection test runs + } + _, _, err := collection.Put(ctx, "12", largeBody) + require.NoError(t, err) + + // assert doc "2" has been evicted even though we only have 9 items in cache with capacity of 10, so memory based + // eviction took place + docRev, ok = db.revisionCache.Peek(ctx, "2", rev, collection.GetCollectionID()) + assert.False(t, ok) + assert.Nil(t, docRev.BodyBytes) + + // assert that the overall memory for rev cache is not over maximum + assert.LessOrEqual(t, cacheStats.RevisionCacheTotalMemory.Value(), dbcOptions.RevisionCacheOptions.MaxBytes) + }) } - var currMem, expValue, revZeroSize int64 - for i := 0; i < 10; i++ { - currMem = cacheStats.RevisionCacheTotalMemory.Value() - revSize, _ := createDocAndReturnSizeAndRev(t, ctx, fmt.Sprint(i), collection, smallBody) - if i == 0 { - revZeroSize = int64(revSize) - } - expValue = currMem + int64(revSize) - assert.Equal(t, expValue, cacheStats.RevisionCacheTotalMemory.Value()) - } - - // test eviction by number of items (adding new doc from createDocAndReturnSizeAndRev shouldn't take memory over threshold defined as 730 bytes) - expValue -= revZeroSize // for doc being evicted - docSize, rev := createDocAndReturnSizeAndRev(t, ctx, fmt.Sprint(11), collection, smallBody) - expValue += int64(docSize) - // assert doc 0 been evicted - docRev, ok := db.revisionCache.Peek(ctx, "0", rev, collection.GetCollectionID()) - assert.False(t, ok) - assert.Nil(t, docRev.BodyBytes) - - currMem = cacheStats.RevisionCacheTotalMemory.Value() - // assert total memory is as expected - assert.Equal(t, expValue, currMem) - - // remove doc "1" to give headroom for memory based eviction - db.revisionCache.RemoveWithRev("1", rev, collection.GetCollectionID()) - docRev, ok = db.revisionCache.Peek(ctx, "1", rev, collection.GetCollectionID()) - assert.False(t, ok) - assert.Nil(t, docRev.BodyBytes) - - // assert current memory from rev cache decreases by the doc size (all docs added thus far are same size) - afterRemoval := currMem - int64(docSize) - assert.Equal(t, afterRemoval, cacheStats.RevisionCacheTotalMemory.Value()) - - // add new doc that will trigger eviction due to taking over memory size - largeBody := Body{ - "type": "test", - "doc": "testDocument", - "foo": "bar", - "lets": "test", - "larger": "document", - "for": "eviction", - "channels": "_default", // add channel for default sync func in default collection test runs - } - _, _, err := collection.Put(ctx, "12", largeBody) - require.NoError(t, err) - - // assert doc "2" has been evicted even though we only have 9 items in cache with capacity of 10, so memory based - // eviction took place - docRev, ok = db.revisionCache.Peek(ctx, "2", rev, collection.GetCollectionID()) - assert.False(t, ok) - assert.Nil(t, docRev.BodyBytes) - - // assert that the overall memory for rev cache is not over maximum - assert.LessOrEqual(t, cacheStats.RevisionCacheTotalMemory.Value(), dbcOptions.RevisionCacheOptions.MaxBytes) } func TestBackingStoreMemoryCalculation(t *testing.T) { - cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} - backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{[]string{"doc2"}, &getDocumentCounter, &getRevisionCounter}, testCollectionID) - cacheOptions := &RevisionCacheOptions{ - MaxItemCount: 10, - MaxBytes: 205, + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{[]string{"doc2"}, &getDocumentCounter, &getRevisionCounter}, testCollectionID) + var maxBytes int64 + if testCase.UseCVCache { + maxBytes = 233 + } else { + maxBytes = 205 + } + cacheOptions := &RevisionCacheOptions{ + MaxItemCount: 10, + MaxBytes: maxBytes, + } + cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) + ctx := base.TestCtx(t) + var err error + + var docRev DocumentRevision + if testCase.UseCVCache { + docRev, err = cache.GetWithCV(ctx, "doc1", &Version{Value: 123, SourceID: "test"}, testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } + assert.Equal(t, "doc1", docRev.DocID) + assert.NotNil(t, docRev.History) + assert.NotNil(t, docRev.Channels) + + currMemStat := memoryBytesCounted.Value() + // assert stats is incremented by appropriate bytes on doc rev + assert.Equal(t, docRev.MemoryBytes, currMemStat) + + // Test get active code pathway of a load from bucket + docRev, err = cache.GetActive(ctx, "doc", testCollectionID) + require.NoError(t, err) + assert.Equal(t, "doc", docRev.DocID) + assert.NotNil(t, docRev.History) + assert.NotNil(t, docRev.Channels) + + newMemStat := currMemStat + docRev.MemoryBytes + // assert stats is incremented by appropriate bytes on doc rev + assert.Equal(t, newMemStat, memoryBytesCounted.Value()) + + // test fail load event doesn't increment memory stat + if testCase.UseCVCache { + docRev, err = cache.GetWithCV(ctx, "doc2", &Version{Value: 123, SourceID: "test"}, testCollectionID, RevCacheOmitDelta) + assertHTTPError(t, err, 404) + } else { + docRev, err = cache.GetWithRev(ctx, "doc2", "1-abc", testCollectionID, RevCacheOmitDelta) + assertHTTPError(t, err, 404) + } + assert.Nil(t, docRev.BodyBytes) + assert.Equal(t, newMemStat, memoryBytesCounted.Value()) + + // assert length is 2 as expected + assert.Equal(t, 2, cache.lruList.Len()) + + memStatBeforeThirdLoad := memoryBytesCounted.Value() + // test another load from bucket but doing so should trigger memory based eviction + if testCase.UseCVCache { + docRev, err = cache.GetWithCV(ctx, "doc3", &Version{Value: 123, SourceID: "test"}, testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc3", "1-abc", testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } + assert.Equal(t, "doc3", docRev.DocID) + assert.NotNil(t, docRev.History) + assert.NotNil(t, docRev.Channels) + + // assert length is still 2 (eviction took place) + test Peek for first added doc is failure + assert.Equal(t, 2, cache.lruList.Len()) + memStatAfterEviction := (memStatBeforeThirdLoad + docRev.MemoryBytes) - currMemStat + assert.Equal(t, memStatAfterEviction, memoryBytesCounted.Value()) + _, ok := cache.Peek(ctx, "doc1", "1-abc", testCollectionID) + assert.False(t, ok) + }) } - cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) - ctx := base.TestCtx(t) - - docRev, err := cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta) - require.NoError(t, err) - assert.Equal(t, "doc1", docRev.DocID) - assert.NotNil(t, docRev.History) - assert.NotNil(t, docRev.Channels) - - currMemStat := memoryBytesCounted.Value() - // assert stats is incremented by appropriate bytes on doc rev - assert.Equal(t, docRev.MemoryBytes, currMemStat) - - // Test get active code pathway of a load from bucket - docRev, err = cache.GetActive(ctx, "doc", testCollectionID) - require.NoError(t, err) - assert.Equal(t, "doc", docRev.DocID) - assert.NotNil(t, docRev.History) - assert.NotNil(t, docRev.Channels) - - newMemStat := currMemStat + docRev.MemoryBytes - // assert stats is incremented by appropriate bytes on doc rev - assert.Equal(t, newMemStat, memoryBytesCounted.Value()) - - // test fail load event doesn't increment memory stat - docRev, err = cache.GetWithRev(ctx, "doc2", "1-abc", testCollectionID, RevCacheOmitDelta) - assertHTTPError(t, err, 404) - assert.Nil(t, docRev.BodyBytes) - assert.Equal(t, newMemStat, memoryBytesCounted.Value()) - - // assert length is 2 as expected - assert.Equal(t, 2, cache.lruList.Len()) - - memStatBeforeThirdLoad := memoryBytesCounted.Value() - // test another load from bucket but doing so should trigger memory based eviction - docRev, err = cache.GetWithRev(ctx, "doc3", "1-abc", testCollectionID, RevCacheOmitDelta) - require.NoError(t, err) - assert.Equal(t, "doc3", docRev.DocID) - assert.NotNil(t, docRev.History) - assert.NotNil(t, docRev.Channels) - - // assert length is still 2 (eviction took place) + test Peek for first added doc is failure - assert.Equal(t, 2, cache.lruList.Len()) - memStatAfterEviction := (memStatBeforeThirdLoad + docRev.MemoryBytes) - currMemStat - assert.Equal(t, memStatAfterEviction, memoryBytesCounted.Value()) - _, ok := cache.Peek(ctx, "doc1", "1-abc", testCollectionID) - assert.False(t, ok) } func TestBackingStore(t *testing.T) { @@ -744,89 +819,139 @@ func TestRevisionImmutableDelta(t *testing.T) { } func TestUpdateDeltaRevCacheMemoryStat(t *testing.T) { - cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} - backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID) - cacheOptions := &RevisionCacheOptions{ - MaxItemCount: 10, - MaxBytes: 125, + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID) + cacheOptions := &RevisionCacheOptions{ + MaxItemCount: 10, + MaxBytes: 125, + } + cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) + + firstDelta := []byte("delta") + secondDelta := []byte("modified delta") + thirdDelta := []byte("another delta further modified") + ctx := base.TestCtx(t) + + // Trigger load into cache + docRev, err := cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta) + assert.NoError(t, err, "Error adding to cache") + + revCacheMem := memoryBytesCounted.Value() + revCacheDelta := newRevCacheDelta(firstDelta, "1-abc", docRev, false, nil) + cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta) + // assert that rev cache memory increases by expected amount + newMem := revCacheMem + revCacheDelta.totalDeltaBytes + assert.Equal(t, newMem, memoryBytesCounted.Value()) + oldDeltaSize := revCacheDelta.totalDeltaBytes + + newMem = memoryBytesCounted.Value() + revCacheDelta = newRevCacheDelta(secondDelta, "1-abc", docRev, false, nil) + cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta) + + // assert the overall memory stat is correctly updated (by the diff between the old delta and the new delta) + newMem += revCacheDelta.totalDeltaBytes - oldDeltaSize + assert.Equal(t, newMem, memoryBytesCounted.Value()) + + revCacheDelta = newRevCacheDelta(thirdDelta, "1-abc", docRev, false, nil) + cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta) + + // assert that eviction took place and as result stat is now 0 (only item in cache was doc1) + assert.Equal(t, int64(0), memoryBytesCounted.Value()) + assert.Equal(t, 0, cache.lruList.Len()) + }) } - cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) - - firstDelta := []byte("delta") - secondDelta := []byte("modified delta") - thirdDelta := []byte("another delta further modified") - ctx := base.TestCtx(t) - - // Trigger load into cache - docRev, err := cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta) - assert.NoError(t, err, "Error adding to cache") - - revCacheMem := memoryBytesCounted.Value() - revCacheDelta := newRevCacheDelta(firstDelta, "1-abc", docRev, false, nil) - cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta) - // assert that rev cache memory increases by expected amount - newMem := revCacheMem + revCacheDelta.totalDeltaBytes - assert.Equal(t, newMem, memoryBytesCounted.Value()) - oldDeltaSize := revCacheDelta.totalDeltaBytes - - newMem = memoryBytesCounted.Value() - revCacheDelta = newRevCacheDelta(secondDelta, "1-abc", docRev, false, nil) - cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta) - - // assert the overall memory stat is correctly updated (by the diff between the old delta and the new delta) - newMem += revCacheDelta.totalDeltaBytes - oldDeltaSize - assert.Equal(t, newMem, memoryBytesCounted.Value()) - - revCacheDelta = newRevCacheDelta(thirdDelta, "1-abc", docRev, false, nil) - cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta) - - // assert that eviction took place and as result stat is now 0 (only item in cache was doc1) - assert.Equal(t, int64(0), memoryBytesCounted.Value()) - assert.Equal(t, 0, cache.lruList.Len()) } func TestImmediateRevCacheMemoryBasedEviction(t *testing.T) { - cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} - backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID) - cacheOptions := &RevisionCacheOptions{ - MaxItemCount: 10, - MaxBytes: 10, + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID) + cacheOptions := &RevisionCacheOptions{ + MaxItemCount: 10, + MaxBytes: 10, + } + ctx := base.TestCtx(t) + var err error + cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) + + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc1", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) + + assert.Equal(t, int64(0), memoryBytesCounted.Value()) + assert.Equal(t, int64(0), cacheNumItems.Value()) + + cache.Upsert(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc2", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) + + assert.Equal(t, int64(0), memoryBytesCounted.Value()) + assert.Equal(t, int64(0), cacheNumItems.Value()) + + // assert we can still fetch this upsert doc + var docRev DocumentRevision + if testCase.UseCVCache { + docRev, err = cache.GetWithCV(ctx, "doc2", &Version{Value: 123, SourceID: "test"}, testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc2", "1-abc", testCollectionID, false) + require.NoError(t, err) + } + assert.Equal(t, "doc2", docRev.DocID) + if testCase.UseCVCache { + assert.Equal(t, int64(130), docRev.MemoryBytes) + } else { + assert.Equal(t, int64(102), docRev.MemoryBytes) + } + assert.NotNil(t, docRev.BodyBytes) + assert.Equal(t, int64(0), memoryBytesCounted.Value()) + assert.Equal(t, int64(0), cacheNumItems.Value()) + + if testCase.UseCVCache { + docRev, err = cache.GetWithCV(ctx, "doc1", &Version{Value: 123, SourceID: "test"}, testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } + assert.NotNil(t, docRev.BodyBytes) + + assert.Equal(t, int64(0), memoryBytesCounted.Value()) + assert.Equal(t, int64(0), cacheNumItems.Value()) + + docRev, err = cache.GetActive(ctx, "doc1", testCollectionID) + require.NoError(t, err) + assert.NotNil(t, docRev.BodyBytes) + + assert.Equal(t, int64(0), memoryBytesCounted.Value()) + assert.Equal(t, int64(0), cacheNumItems.Value()) + }) } - ctx := base.TestCtx(t) - cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) - - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc1", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) - - assert.Equal(t, int64(0), memoryBytesCounted.Value()) - assert.Equal(t, int64(0), cacheNumItems.Value()) - - cache.Upsert(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc2", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) - - assert.Equal(t, int64(0), memoryBytesCounted.Value()) - assert.Equal(t, int64(0), cacheNumItems.Value()) - - // assert we can still fetch this upsert doc - docRev, err := cache.GetWithRev(ctx, "doc2", "1-abc", testCollectionID, false) - require.NoError(t, err) - assert.Equal(t, "doc2", docRev.DocID) - assert.Equal(t, int64(102), docRev.MemoryBytes) - assert.NotNil(t, docRev.BodyBytes) - assert.Equal(t, int64(0), memoryBytesCounted.Value()) - assert.Equal(t, int64(0), cacheNumItems.Value()) - - docRev, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta) - require.NoError(t, err) - assert.NotNil(t, docRev.BodyBytes) - - assert.Equal(t, int64(0), memoryBytesCounted.Value()) - assert.Equal(t, int64(0), cacheNumItems.Value()) - - docRev, err = cache.GetActive(ctx, "doc1", testCollectionID) - require.NoError(t, err) - assert.NotNil(t, docRev.BodyBytes) - - assert.Equal(t, int64(0), memoryBytesCounted.Value()) - assert.Equal(t, int64(0), cacheNumItems.Value()) } // TestShardedMemoryEviction: @@ -852,7 +977,7 @@ func TestShardedMemoryEviction(t *testing.T) { } // add doc that will be added to one shard - size, _ := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, docBody) + size, _, _ := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, docBody) assert.Equal(t, int64(size), cacheStats.RevisionCacheTotalMemory.Value()) // grab this particular shard + assert that the shard memory usage is as expected shardedCache := db.revisionCache.(*ShardedLRURevisionCache) @@ -860,7 +985,7 @@ func TestShardedMemoryEviction(t *testing.T) { assert.Equal(t, int64(size), doc1Shard.currMemoryUsage.Value()) // add new doc in diff shard + assert that the shard memory usage is as expected - size, _ = createDocAndReturnSizeAndRev(t, ctx, "doc2", collection, docBody) + size, _, _ = createDocAndReturnSizeAndRev(t, ctx, "doc2", collection, docBody) doc2Shard := shardedCache.getShard("doc2") assert.Equal(t, int64(size), doc2Shard.currMemoryUsage.Value()) // overall mem usage should be combination oif the two added docs @@ -874,7 +999,7 @@ func TestShardedMemoryEviction(t *testing.T) { "some": "field", } // add new doc to trigger eviction and assert stats are as expected - newDocSize, _ := createDocAndReturnSizeAndRev(t, ctx, "doc3", collection, docBody) + newDocSize, _, _ := createDocAndReturnSizeAndRev(t, ctx, "doc3", collection, docBody) doc3Shard := shardedCache.getShard("doc3") assert.Equal(t, int64(newDocSize), doc3Shard.currMemoryUsage.Value()) assert.Equal(t, int64(2), cacheStats.RevisionCacheNumItems.Value()) @@ -925,41 +1050,71 @@ func TestShardedMemoryEvictionWhenShardEmpty(t *testing.T) { } func TestImmediateRevCacheItemBasedEviction(t *testing.T) { - cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} - backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID) - cacheOptions := &RevisionCacheOptions{ - MaxItemCount: 1, - MaxBytes: 0, // turn off memory based eviction + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID) + cacheOptions := &RevisionCacheOptions{ + MaxItemCount: 1, + MaxBytes: 0, // turn off memory based eviction + } + ctx := base.TestCtx(t) + var err error + + cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) + // load up item to hit max capacity + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc1", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) + + // eviction starts from here in test + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "newDoc", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) + + assert.Equal(t, int64(15), memoryBytesCounted.Value()) + assert.Equal(t, int64(1), cacheNumItems.Value()) + + cache.Upsert(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc2", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) + + assert.Equal(t, int64(15), memoryBytesCounted.Value()) + assert.Equal(t, int64(1), cacheNumItems.Value()) + + var docRev DocumentRevision + if testCase.UseCVCache { + docRev, err = cache.GetWithCV(ctx, "doc3", &Version{Value: 123, SourceID: "test"}, testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc3", "1-abc", testCollectionID, RevCacheOmitDelta) + require.NoError(t, err) + } + assert.NotNil(t, docRev.BodyBytes) + + // memory usage is higher for cv (body bytes on doc rev is higher given the cv definition) + if testCase.UseCVCache { + assert.Equal(t, int64(130), memoryBytesCounted.Value()) + } else { + assert.Equal(t, int64(102), memoryBytesCounted.Value()) + } + assert.Equal(t, int64(1), cacheNumItems.Value()) + + docRev, err = cache.GetActive(ctx, "doc4", testCollectionID) + require.NoError(t, err) + assert.NotNil(t, docRev.BodyBytes) + + assert.Equal(t, int64(102), memoryBytesCounted.Value()) + assert.Equal(t, int64(1), cacheNumItems.Value()) + }) } - ctx := base.TestCtx(t) - cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted) - // load up item to hit max capacity - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc1", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) - - // eviction starts from here in test - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "newDoc", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) - - assert.Equal(t, int64(15), memoryBytesCounted.Value()) - assert.Equal(t, int64(1), cacheNumItems.Value()) - - cache.Upsert(ctx, DocumentRevision{BodyBytes: []byte(`{"some":"test"}`), DocID: "doc2", RevID: "1-abc", CV: &Version{Value: 123, SourceID: "test"}, History: Revisions{"start": 1}}, testCollectionID) - - assert.Equal(t, int64(15), memoryBytesCounted.Value()) - assert.Equal(t, int64(1), cacheNumItems.Value()) - - docRev, err := cache.GetWithRev(ctx, "doc3", "1-abc", testCollectionID, RevCacheOmitDelta) - require.NoError(t, err) - assert.NotNil(t, docRev.BodyBytes) - - assert.Equal(t, int64(102), memoryBytesCounted.Value()) - assert.Equal(t, int64(1), cacheNumItems.Value()) - - docRev, err = cache.GetActive(ctx, "doc4", testCollectionID) - require.NoError(t, err) - assert.NotNil(t, docRev.BodyBytes) - - assert.Equal(t, int64(102), memoryBytesCounted.Value()) - assert.Equal(t, int64(1), cacheNumItems.Value()) } func TestResetRevCache(t *testing.T) { @@ -975,7 +1130,7 @@ func TestResetRevCache(t *testing.T) { cacheStats := db.DbStats.Cache() // add a doc - docSize, _ := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, Body{"test": "doc"}) + docSize, _, _ := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, Body{"test": "doc"}) assert.Equal(t, int64(docSize), cacheStats.RevisionCacheTotalMemory.Value()) assert.Equal(t, int64(1), cacheStats.RevisionCacheNumItems.Value()) @@ -988,106 +1143,156 @@ func TestResetRevCache(t *testing.T) { } func TestBasicOperationsOnCacheWithMemoryStat(t *testing.T) { - dbcOptions := DatabaseContextOptions{ - RevisionCacheOptions: &RevisionCacheOptions{ - MaxBytes: 730, - MaxItemCount: 10, + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, }, } - db, ctx := SetupTestDBWithOptions(t, dbcOptions) - defer db.Close(ctx) - collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - cacheStats := db.DbStats.Cache() - collctionID := collection.GetCollectionID() - - // Test Put on new doc - docSize, revID := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, Body{"test": "doc"}) - assert.Equal(t, int64(docSize), cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Get with item in the cache - docRev, err := db.revisionCache.GetWithRev(ctx, "doc1", revID, collctionID, RevCacheOmitDelta) - require.NoError(t, err) - assert.NotNil(t, docRev.BodyBytes) - assert.Equal(t, int64(docSize), cacheStats.RevisionCacheTotalMemory.Value()) - revIDDoc1 := docRev.RevID - - // Test Get operation with load from bucket, need to first create and remove from rev cache - prevMemStat := cacheStats.RevisionCacheTotalMemory.Value() - revDoc2 := createThenRemoveFromRevCache(t, ctx, "doc2", db, collection) - // load from doc from bucket - docRev, err = db.revisionCache.GetWithRev(ctx, "doc2", docRev.RevID, collctionID, RevCacheOmitDelta) - require.NoError(t, err) - assert.NotNil(t, docRev.BodyBytes) - assert.Equal(t, "doc2", docRev.DocID) - assert.Greater(t, cacheStats.RevisionCacheTotalMemory.Value(), prevMemStat) - - // Test Get active with item resident in cache - prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() - docRev, err = db.revisionCache.GetActive(ctx, "doc2", collctionID) - require.NoError(t, err) - assert.Equal(t, "doc2", docRev.DocID) - assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Get active with item to be loaded from bucket, need to first create and remove from rev cache - prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() - revDoc3 := createThenRemoveFromRevCache(t, ctx, "doc3", db, collection) - docRev, err = db.revisionCache.GetActive(ctx, "doc3", collctionID) - require.NoError(t, err) - assert.Equal(t, "doc3", docRev.DocID) - assert.Greater(t, cacheStats.RevisionCacheTotalMemory.Value(), prevMemStat) - - // Test Peek at item not in cache, assert stats unchanged - prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() - docRev, ok := db.revisionCache.Peek(ctx, "doc4", "1-abc", collctionID) - require.False(t, ok) - assert.Nil(t, docRev.BodyBytes) - assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxBytes: 730, + MaxItemCount: 10, + }, + } + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + cacheStats := db.DbStats.Cache() + collctionID := collection.GetCollectionID() + var err error + + // Test Put on new doc + docSize, revID, docCV := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, Body{"test": "doc"}) + assert.Equal(t, int64(docSize), cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Get with item in the cache + var docRev DocumentRevision + if testCase.UseCVCache { + docRev, err = db.revisionCache.GetWithCV(ctx, "doc1", docCV, collctionID, false) + require.NoError(t, err) + } else { + docRev, err = db.revisionCache.GetWithRev(ctx, "doc1", revID, collctionID, RevCacheOmitDelta) + require.NoError(t, err) + } + assert.NotNil(t, docRev.BodyBytes) + assert.Equal(t, int64(docSize), cacheStats.RevisionCacheTotalMemory.Value()) + revIDDoc1 := docRev.RevID + cvDoc1 := docRev.CV + + // Test Get operation with load from bucket, need to first create and remove from rev cache + prevMemStat := cacheStats.RevisionCacheTotalMemory.Value() + revDoc2 := createThenRemoveFromRevCache(t, ctx, "doc2", db, collection) + // load from doc from bucket + if testCase.UseCVCache { + docRev, err = db.revisionCache.GetWithCV(ctx, "doc2", &revDoc2.CV, collctionID, false) + require.NoError(t, err) + } else { + docRev, err = db.revisionCache.GetWithRev(ctx, "doc2", docRev.RevID, collctionID, RevCacheOmitDelta) + require.NoError(t, err) + } + assert.NotNil(t, docRev.BodyBytes) + assert.Equal(t, "doc2", docRev.DocID) + assert.Greater(t, cacheStats.RevisionCacheTotalMemory.Value(), prevMemStat) + + // Test Get active with item resident in cache + prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() + docRev, err = db.revisionCache.GetActive(ctx, "doc2", collctionID) + require.NoError(t, err) + assert.Equal(t, "doc2", docRev.DocID) + assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Get active with item to be loaded from bucket, need to first create and remove from rev cache + prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() + revDoc3 := createThenRemoveFromRevCache(t, ctx, "doc3", db, collection) + docRev, err = db.revisionCache.GetActive(ctx, "doc3", collctionID) + require.NoError(t, err) + assert.Equal(t, "doc3", docRev.DocID) + assert.Greater(t, cacheStats.RevisionCacheTotalMemory.Value(), prevMemStat) + + // Test Peek at item not in cache, assert stats unchanged + prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() + docRev, ok := db.revisionCache.Peek(ctx, "doc4", "1-abc", collctionID) + require.False(t, ok) + assert.Nil(t, docRev.BodyBytes) + assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Peek in cache, assert stat unchanged + docRev, ok = db.revisionCache.Peek(ctx, "doc3", revDoc3.RevTreeID, collctionID) + require.True(t, ok) + assert.Equal(t, "doc3", docRev.DocID) + assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Upsert with item in cache + assert stat is expected + docRev.CalculateBytes() + doc3Size := docRev.MemoryBytes + expMem := cacheStats.RevisionCacheTotalMemory.Value() - doc3Size + newDocRev := documentRevisionForCacheTestUpdate("doc3", `"some": "body"`, revDoc3) + + expMem = expMem + 14 // size for above doc rev + db.revisionCache.Upsert(ctx, newDocRev, collctionID) + assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Upsert with item not in cache, assert stat is as expected + newDocRev = documentRevisionForCacheTest("doc5", `"some": "body"`) + expMem = cacheStats.RevisionCacheTotalMemory.Value() + 14 // size for above doc rev + db.revisionCache.Upsert(ctx, newDocRev, collctionID) + assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Remove with something in cache, assert stat decrements by expected value + if testCase.UseCVCache { + db.revisionCache.RemoveWithCV(ctx, "doc5", newDocRev.CV, collctionID) + } else { + db.revisionCache.RemoveWithRev(ctx, "doc5", "1-abc", collctionID) + } + expMem -= 14 + assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Remove with item not in cache, assert stat is unchanged + prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() + if testCase.UseCVCache { + cv := Version{SourceID: "test", Value: 123} + db.revisionCache.RemoveWithCV(ctx, "doc6", &cv, collctionID) + } else { + db.revisionCache.RemoveWithRev(ctx, "doc6", "1-abc", collctionID) + } + assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) + + // Test Update Delta, assert stat increases as expected + revDelta := newRevCacheDelta([]byte(`"rev":"delta"`), "1-abc", newDocRev, false, nil) + expMem = prevMemStat + revDelta.totalDeltaBytes + if testCase.UseCVCache { + db.revisionCache.UpdateDeltaCV(ctx, "doc3", &revDoc3.CV, collctionID, revDelta) + } else { + db.revisionCache.UpdateDelta(ctx, "doc3", revDoc3.RevTreeID, collctionID, revDelta) + } + assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) + + // Empty cache and see memory stat is 0 + if testCase.UseCVCache { + db.revisionCache.RemoveWithCV(ctx, "doc3", &revDoc3.CV, collctionID) + db.revisionCache.RemoveWithCV(ctx, "doc2", &revDoc2.CV, collctionID) + db.revisionCache.RemoveWithCV(ctx, "doc1", cvDoc1, collctionID) + } else { + db.revisionCache.RemoveWithRev(ctx, "doc3", revDoc3.RevTreeID, collctionID) + db.revisionCache.RemoveWithRev(ctx, "doc2", revDoc2.RevTreeID, collctionID) + db.revisionCache.RemoveWithRev(ctx, "doc1", revIDDoc1, collctionID) + } + + assert.Equal(t, int64(0), cacheStats.RevisionCacheNumItems.Value()) + assert.Equal(t, int64(0), cacheStats.RevisionCacheTotalMemory.Value()) + }) + } - // Test Peek in cache, assert stat unchanged - docRev, ok = db.revisionCache.Peek(ctx, "doc3", revDoc3.RevTreeID, collctionID) - require.True(t, ok) - assert.Equal(t, "doc3", docRev.DocID) - assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Upsert with item in cache + assert stat is expected - docRev.CalculateBytes() - doc3Size := docRev.MemoryBytes - expMem := cacheStats.RevisionCacheTotalMemory.Value() - doc3Size - newDocRev := documentRevisionForCacheTestUpdate("doc3", `"some": "body"`, revDoc3) - - expMem = expMem + 14 // size for above doc rev - db.revisionCache.Upsert(ctx, newDocRev, collctionID) - assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Upsert with item not in cache, assert stat is as expected - newDocRev = documentRevisionForCacheTest("doc5", `"some": "body"`) - expMem = cacheStats.RevisionCacheTotalMemory.Value() + 14 // size for above doc rev - db.revisionCache.Upsert(ctx, newDocRev, collctionID) - assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Remove with something in cache, assert stat decrements by expected value - db.revisionCache.RemoveWithRev("doc5", "1-abc", collctionID) - expMem -= 14 - assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Remove with item not in cache, assert stat is unchanged - prevMemStat = cacheStats.RevisionCacheTotalMemory.Value() - db.revisionCache.RemoveWithRev("doc6", "1-abc", collctionID) - assert.Equal(t, prevMemStat, cacheStats.RevisionCacheTotalMemory.Value()) - - // Test Update Delta, assert stat increases as expected - revDelta := newRevCacheDelta([]byte(`"rev":"delta"`), "1-abc", newDocRev, false, nil) - expMem = prevMemStat + revDelta.totalDeltaBytes - db.revisionCache.UpdateDelta(ctx, "doc3", revDoc3.RevTreeID, collctionID, revDelta) - assert.Equal(t, expMem, cacheStats.RevisionCacheTotalMemory.Value()) - - // Empty cache and see memory stat is 0 - db.revisionCache.RemoveWithRev("doc3", revDoc3.RevTreeID, collctionID) - db.revisionCache.RemoveWithRev("doc2", revDoc2.RevTreeID, collctionID) - db.revisionCache.RemoveWithRev("doc1", revIDDoc1, collctionID) - - // TODO: pending CBG-4135 assert rev cache had 0 items in it - assert.Equal(t, int64(0), cacheStats.RevisionCacheTotalMemory.Value()) } // Ensure subsequent updates to delta don't mutate previously retrieved deltas @@ -1146,7 +1351,7 @@ func TestRevisionCacheRemove(t *testing.T) { assert.Equal(t, rev1id, docRev.RevID) assert.Equal(t, int64(0), db.DbStats.Cache().RevisionCacheMisses.Value()) - collection.revisionCache.RemoveWithRev("doc", rev1id) + collection.revisionCache.RemoveWithRev(ctx, "doc", rev1id) docRev, err = collection.revisionCache.GetWithRev(base.TestCtx(t), "doc", rev1id, true) assert.NoError(t, err) @@ -1281,99 +1486,189 @@ func TestRevCacheHitMultiCollectionLoadFromBucket(t *testing.T) { } func TestRevCacheCapacityStat(t *testing.T) { - cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, cacheMemoryStat := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} - backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{[]string{"badDoc"}, &getDocumentCounter, &getRevisionCounter}, testCollectionID) - cacheOptions := &RevisionCacheOptions{ - MaxItemCount: 4, - MaxBytes: 0, + testCases := []struct { + name string + UseCVCache bool + }{ + { + name: "Rev cache pathway", + UseCVCache: false, + }, + { + name: "CV cache pathway", + UseCVCache: true, + }, } - cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &cacheMemoryStat) - ctx := base.TestCtx(t) - - assert.Equal(t, int64(0), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Create a new doc + asert num items increments - cache.Put(ctx, documentRevisionForCacheTest("doc1", `{"test":"1234"}`), testCollectionID) - assert.Equal(t, int64(1), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // test not found doc, assert that the stat isn't incremented - _, err := cache.GetWithRev(ctx, "badDoc", "1-abc", testCollectionID, false) - require.Error(t, err) - assert.Equal(t, int64(1), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, cacheMemoryStat := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{[]string{"badDoc"}, &getDocumentCounter, &getRevisionCounter}, testCollectionID) + cacheOptions := &RevisionCacheOptions{ + MaxItemCount: 4, + MaxBytes: 0, + } + cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &cacheMemoryStat) + ctx := base.TestCtx(t) + var err error + + assert.Equal(t, int64(0), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Create a new doc + asert num items increments + cache.Put(ctx, documentRevisionForCacheTest("doc1", `{"test":"1234"}`), testCollectionID) + assert.Equal(t, int64(1), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // test not found doc, assert that the stat isn't incremented + if testCase.UseCVCache { + cv := Version{SourceID: "test", Value: 123} + _, err = cache.GetWithCV(ctx, "badDoc", &cv, testCollectionID, false) + require.Error(t, err) + } else { + _, err = cache.GetWithRev(ctx, "badDoc", "1-abc", testCollectionID, false) + require.Error(t, err) + } + assert.Equal(t, int64(1), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Get on a doc that doesn't exist in cache, assert num items increments + var docRev DocumentRevision + if testCase.UseCVCache { + cv := Version{SourceID: "test", Value: 123} + docRev, err = cache.GetWithCV(ctx, "doc2", &cv, testCollectionID, false) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc2", "1-abc", testCollectionID, false) + require.NoError(t, err) + } + assert.Equal(t, "doc2", docRev.DocID) + assert.Equal(t, int64(2), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Get on item in cache, assert num items remains the same + if testCase.UseCVCache { + cv := Version{SourceID: "test", Value: 123} + docRev, err = cache.GetWithCV(ctx, "doc1", &cv, testCollectionID, false) + require.NoError(t, err) + } else { + docRev, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, false) + require.NoError(t, err) + } + assert.Equal(t, "doc1", docRev.DocID) + assert.Equal(t, int64(2), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Get Active on doc not in cache, assert num items increments + docRev, err = cache.GetActive(ctx, "doc3", testCollectionID) + require.NoError(t, err) + assert.Equal(t, "doc3", docRev.DocID) + assert.Equal(t, int64(3), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Get Active on doc in the cache, assert that the num items stat remains unchanged + docRev, err = cache.GetActive(ctx, "doc1", testCollectionID) + require.NoError(t, err) + assert.Equal(t, "doc1", docRev.DocID) + assert.Equal(t, int64(3), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Upsert a doc resident in cache, assert stat is unchanged + cache.Upsert(ctx, documentRevisionForCacheTest("doc1", `{"test":"12345"}`), testCollectionID) + assert.Equal(t, int64(3), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Upsert new doc, assert the num items stat increments + cache.Upsert(ctx, documentRevisionForCacheTest("doc4", `{"test":"1234}`), testCollectionID) + assert.Equal(t, int64(4), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Peek a doc that is resident in cache, assert stat is unchanged + docRev, ok := cache.Peek(ctx, "doc4", "1-abc", testCollectionID) + require.True(t, ok) + assert.Equal(t, "doc4", docRev.DocID) + assert.Equal(t, int64(4), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Peek a doc that is not resident in cache, assert stat is unchanged + docRev, ok = cache.Peek(ctx, "doc5", "1-abc", testCollectionID) + require.False(t, ok) + assert.Equal(t, int64(4), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Eviction situation and assert stat doesn't go over the capacity set + cache.Put(ctx, documentRevisionForCacheTest("doc5", `{"test":"1234"}`), testCollectionID) + assert.Equal(t, int64(4), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // test case of eviction for upsert + cache.Upsert(ctx, documentRevisionForCacheTest("doc6", `{"test":"12345"}`), testCollectionID) + assert.Equal(t, int64(4), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + + // Empty cache + if testCase.UseCVCache { + cv := Version{SourceID: "test", Value: 123} + cache.RemoveWithCV(ctx, "doc1", &cv, testCollectionID) + cache.RemoveWithCV(ctx, "doc4", &cv, testCollectionID) + cache.RemoveWithCV(ctx, "doc5", &cv, testCollectionID) + cache.RemoveWithCV(ctx, "doc6", &cv, testCollectionID) + } else { + cache.RemoveWithRev(ctx, "doc1", "1-abc", testCollectionID) + cache.RemoveWithRev(ctx, "doc4", "1-abc", testCollectionID) + cache.RemoveWithRev(ctx, "doc5", "1-abc", testCollectionID) + cache.RemoveWithRev(ctx, "doc6", "1-abc", testCollectionID) + } + + // Assert num items goes back to 0 + assert.Equal(t, int64(0), cacheNumItems.Value()) + assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + }) + } +} - // Get on a doc that doesn't exist in cache, assert num items increments - docRev, err := cache.GetWithRev(ctx, "doc2", "1-abc", testCollectionID, false) - require.NoError(t, err) - assert.Equal(t, "doc2", docRev.DocID) - assert.Equal(t, int64(2), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) +func TestRevCacheOnDemand(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) - // Get on item in cache, assert num items remains the same - docRev, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, false) + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxItemCount: 2, + ShardCount: 1, + }, + } + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + docID := "doc1" + revID, _, err := collection.Put(ctx, docID, Body{"ver": "1"}) require.NoError(t, err) - assert.Equal(t, "doc1", docRev.DocID) - assert.Equal(t, int64(2), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - // Get Active on doc not in cache, assert num items increments - docRev, err = cache.GetActive(ctx, "doc3", testCollectionID) - require.NoError(t, err) - assert.Equal(t, "doc3", docRev.DocID) - assert.Equal(t, int64(3), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t)) + defer testCtxCancel() - // Get Active on doc in the cache, assert that the num items stat remains unchanged - docRev, err = cache.GetActive(ctx, "doc1", testCollectionID) + for i := 0; i < 2; i++ { + docID := fmt.Sprintf("extraDoc%d", i) + revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"}) + require.NoError(t, err) + go func() { + for { + select { + case <-testCtx.Done(): + return + default: + _, err = db.revisionCache.GetWithRev(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck + } + } + }() + } + log.Printf("Updating doc to trigger on-demand import") + err = collection.dataStore.Set(docID, 0, nil, []byte(`{"ver": "2"}`)) require.NoError(t, err) - assert.Equal(t, "doc1", docRev.DocID) - assert.Equal(t, int64(3), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Upsert a doc resident in cache, assert stat is unchanged - cache.Upsert(ctx, documentRevisionForCacheTest("doc1", `{"test":"12345"}`), testCollectionID) - assert.Equal(t, int64(3), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Upsert new doc, assert the num items stat increments - cache.Upsert(ctx, documentRevisionForCacheTest("doc4", `{"test":"1234}`), testCollectionID) - assert.Equal(t, int64(4), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Peek a doc that is resident in cache, assert stat is unchanged - docRev, ok := cache.Peek(ctx, "doc4", "1-abc", testCollectionID) - require.True(t, ok) - assert.Equal(t, "doc4", docRev.DocID) - assert.Equal(t, int64(4), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Peek a doc that is not resident in cache, assert stat is unchanged - docRev, ok = cache.Peek(ctx, "doc5", "1-abc", testCollectionID) - require.False(t, ok) - assert.Equal(t, int64(4), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Eviction situation and assert stat doesn't go over the capacity set - cache.Put(ctx, documentRevisionForCacheTest("doc5", `{"test":"1234"}`), testCollectionID) - assert.Equal(t, int64(4), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // test case of eviction for upsert - cache.Upsert(ctx, documentRevisionForCacheTest("doc6", `{"test":"12345"}`), testCollectionID) - assert.Equal(t, int64(4), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) - - // Empty cache - cache.RemoveWithRev("doc1", "1-abc", testCollectionID) - cache.RemoveWithRev("doc4", "1-abc", testCollectionID) - cache.RemoveWithRev("doc5", "1-abc", testCollectionID) - cache.RemoveWithRev("doc6", "1-abc", testCollectionID) - - // Assert num items goes back to 0 - assert.Equal(t, int64(0), cacheNumItems.Value()) - assert.Equal(t, int64(len(cache.cache)), cacheNumItems.Value()) + log.Printf("Calling getRev for %s, %s", docID, revID) + rev, err := collection.getRev(ctx, docID, revID, 0, nil) + require.Error(t, err) + require.ErrorContains(t, err, "missing") + // returns empty doc rev + assert.Equal(t, "", rev.DocID) } // documentRevisionForCacheTest creates a document revision with the specified body and key, and a hardcoded revID, cv and history: @@ -1457,7 +1752,7 @@ func TestRevCacheOperationsCV(t *testing.T) { assert.Equal(t, int64(0), cacheMissCounter.Value()) // remove the doc rev from the cache and assert that the document is no longer present in cache - cache.RemoveWithCV("doc1", &cv, testCollectionID) + cache.RemoveWithCV(base.TestCtx(t), "doc1", &cv, testCollectionID) assert.Equal(t, 0, len(cache.cache)) assert.Equal(t, 0, len(cache.hlvCache)) assert.Equal(t, 0, cache.lruList.Len()) @@ -1496,7 +1791,7 @@ func createThenRemoveFromRevCache(t *testing.T, ctx context.Context, docID strin revIDDoc, doc, err := collection.Put(ctx, docID, Body{"test": "doc"}) require.NoError(t, err) - db.revisionCache.RemoveWithRev(docID, revIDDoc, collection.GetCollectionID()) + db.revisionCache.RemoveWithRev(ctx, docID, revIDDoc, collection.GetCollectionID()) docVersion := DocVersion{ RevTreeID: doc.CurrentRev, } @@ -1507,7 +1802,7 @@ func createThenRemoveFromRevCache(t *testing.T, ctx context.Context, docID strin } // createDocAndReturnSizeAndRev creates a rev and measures its size based on rev cache measurements -func createDocAndReturnSizeAndRev(t *testing.T, ctx context.Context, docID string, collection *DatabaseCollectionWithUser, body Body) (int, string) { +func createDocAndReturnSizeAndRev(t *testing.T, ctx context.Context, docID string, collection *DatabaseCollectionWithUser, body Body) (int, string, *Version) { rev, doc, err := collection.Put(ctx, docID, body) require.NoError(t, err) @@ -1526,7 +1821,124 @@ func createDocAndReturnSizeAndRev(t *testing.T, ctx context.Context, docID strin expectedSize += len([]byte(v)) } - return expectedSize, rev + return expectedSize, rev, doc.HLV.ExtractCurrentVersionFromHLV() +} + +func TestRevCacheOnDemandImport(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) + + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxItemCount: 2, + ShardCount: 1, + }, + } + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + docID := "doc1" + revID, _, err := collection.Put(ctx, docID, Body{"ver": "1"}) + require.NoError(t, err) + + ctx, testCtxCancel := context.WithCancel(ctx) + defer testCtxCancel() + + for i := 0; i < 2; i++ { + docID := fmt.Sprintf("extraDoc%d", i) + revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"}) + require.NoError(t, err) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + _, err = db.revisionCache.GetWithRev(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck + } + } + }() + } + err = collection.dataStore.Set(docID, 0, nil, []byte(`{"ver": "2"}`)) + require.NoError(t, err) + rev, err := collection.getRev(ctx, docID, revID, 0, nil) + require.Error(t, err) + require.ErrorContains(t, err, "missing") + // returns empty doc rev + assert.Equal(t, "", rev.DocID) +} + +func TestRevCacheOnDemandMemoryEviction(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) + + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxItemCount: 20, + ShardCount: 1, + MaxBytes: 112, // equivalent to max size 2 items + }, + } + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + docID := "doc1" + revID, _, err := collection.Put(ctx, docID, Body{"ver": "1"}) + require.NoError(t, err) + + testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t)) + defer testCtxCancel() + + for i := 0; i < 2; i++ { + docID := fmt.Sprintf("extraDoc%d", i) + revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"}) + require.NoError(t, err) + go func() { + for { + select { + case <-testCtx.Done(): + return + default: + _, err = db.revisionCache.GetWithRev(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck + } + } + }() + } + err = collection.dataStore.Set(docID, 0, nil, []byte(`{"ver": "2"}`)) + require.NoError(t, err) + rev, err := collection.getRev(ctx, docID, revID, 0, nil) + require.Error(t, err) + require.ErrorContains(t, err, "missing") + // returns empty doc rev + assert.Equal(t, "", rev.DocID) + +} + +func TestRevCacheOnDemandImportNoCache(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) + + db, ctx := setupTestDB(t) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + docID := "doc1" + revID1, _, err := collection.Put(ctx, docID, Body{"foo": "bar"}) + require.NoError(t, err) + + _, exists := collection.revisionCache.Peek(ctx, docID, revID1) + require.True(t, exists) + + require.NoError(t, collection.dataStore.Set(docID, 0, nil, []byte(`{"foo": "baz"}`))) + + doc, err := collection.GetDocument(ctx, docID, DocUnmarshalSync) + require.NoError(t, err) + require.Equal(t, Body{"foo": "baz"}, doc.Body(ctx)) + + // rev1 still exists in cache but not on server + _, exists = collection.revisionCache.Peek(ctx, docID, revID1) + require.True(t, exists) + + // rev2 is not in cache but is on server + _, exists = collection.revisionCache.Peek(ctx, docID, doc.CurrentRev) + require.False(t, exists) } // TestLoaderMismatchInCV: @@ -1620,7 +2032,7 @@ func TestGetActive(t *testing.T) { } // remove the entry form the rev cache to force the cache to not have the active version in it - collection.revisionCache.RemoveWithCV("doc", &expectedCV) + collection.revisionCache.RemoveWithCV(ctx, "doc", &expectedCV) // call get active to get the active version from the bucket docRev, err := collection.revisionCache.GetActive(base.TestCtx(t), "doc") @@ -1680,3 +2092,150 @@ func TestConcurrentPutAndGetOnRevCache(t *testing.T) { assert.Equal(t, cacheElem, cvElement) assert.Equal(t, cacheElem, revElement) } + +func TestLoadActiveDocFromBucketRevCacheChurn(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) + + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxItemCount: 2, + ShardCount: 1, + }, + } + var wg sync.WaitGroup + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + docID := "doc1" + _, _, err := collection.Put(ctx, docID, Body{"ver": "0"}) + require.NoError(t, err) + wg.Add(1) + + testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t)) + defer testCtxCancel() + + for i := 0; i < 2; i++ { + docID := fmt.Sprintf("extraDoc%d", i) + revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"}) + require.NoError(t, err) + go func() { + for { + select { + case <-testCtx.Done(): + return + default: + _, err = db.revisionCache.GetWithRev(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck + } + } + }() + } + + go func() { + for i := 0; i < 100; i++ { + err = collection.dataStore.Set(docID, 0, nil, []byte(fmt.Sprintf(`{"ver": "%d"}`, i))) + require.NoError(t, err) + _, err := db.revisionCache.GetActive(ctx, docID, collection.GetCollectionID()) + if err != nil { + break + } + } + wg.Done() + }() + wg.Wait() + require.NoError(t, err) +} + +func TestLoadRequestedRevFromBucketHighChurn(t *testing.T) { + + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxItemCount: 2, + ShardCount: 1, + }, + } + var wg sync.WaitGroup + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + docID := "doc1" + rev1ID, _, err := collection.Put(ctx, docID, Body{"ver": "0"}) + require.NoError(t, err) + wg.Add(1) + + testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t)) + defer testCtxCancel() + + for i := 0; i < 2; i++ { + docID := fmt.Sprintf("extraDoc%d", i) + revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"}) + require.NoError(t, err) + go func() { + for { + select { + case <-testCtx.Done(): + return + default: + _, err = db.revisionCache.GetWithRev(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck + } + } + }() + } + + var getErr error + go func() { + for i := 0; i < 100; i++ { + _, getErr = db.revisionCache.GetWithRev(ctx, docID, rev1ID, collection.GetCollectionID(), true) + if getErr != nil { + break + } + } + wg.Done() + }() + wg.Wait() + require.NoError(t, getErr) + +} + +func TestPutRevHighRevCacheChurn(t *testing.T) { + + dbcOptions := DatabaseContextOptions{ + RevisionCacheOptions: &RevisionCacheOptions{ + MaxItemCount: 2, + ShardCount: 1, + }, + } + var wg sync.WaitGroup + db, ctx := SetupTestDBWithOptions(t, dbcOptions) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + docID := "doc1" + wg.Add(1) + + testCtx, testCtxCancel := context.WithCancel(base.TestCtx(t)) + defer testCtxCancel() + + for i := 0; i < 2; i++ { + docID := fmt.Sprintf("extraDoc%d", i) + revID, _, err := collection.Put(ctx, docID, Body{"fake": "body"}) + require.NoError(t, err) + go func() { + for { + select { + case <-testCtx.Done(): + return + default: + _, err = db.revisionCache.GetWithRev(ctx, docID, revID, collection.GetCollectionID(), RevCacheOmitDelta) //nolint:errcheck + } + } + }() + } + + go func() { + for i := 0; i < 100; i++ { + docRev := DocumentRevision{DocID: docID, RevID: fmt.Sprintf("1-%d", i), CV: &Version{SourceID: "someSrc", Value: uint64(i)}, BodyBytes: []byte(fmt.Sprintf(`{"ver": "%d"}`, i)), History: Revisions{"start": 1}} + db.revisionCache.Put(ctx, docRev, collection.GetCollectionID()) + } + wg.Done() + }() + wg.Wait() +} diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index d6d4220fc1..b6b4b7370e 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -1610,7 +1610,7 @@ func TestImportRevisionCopy(t *testing.T) { _, err := dataStore.Add(key, 0, docBody) assert.NoError(t, err, "Unable to insert doc TestImportDelete") - // 2. Trigger import via SG retrieval + // 2. Trigger import via SG retrieval, this will not populate the rev cache. response := rt.SendAdminRequest("GET", "/{{.keyspace}}/_raw/"+key, "") assert.Equal(t, 200, response.Code) var rawInsertResponse rest.RawResponse @@ -1618,6 +1618,9 @@ func TestImportRevisionCopy(t *testing.T) { assert.NoError(t, err, "Unable to unmarshal raw response") rev1id := rawInsertResponse.Sync.Rev.RevTreeID + // Populate rev cache by getting the doc again + rt.GetDoc(key) + // 3. Update via SDK updatedBody := make(map[string]interface{}) updatedBody["test"] = "TestImportRevisionCopyModified" @@ -2324,6 +2327,8 @@ func TestImportRollback(t *testing.T) { // - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back // - Test case rollbackWithoutFailover will only rollback one partition func TestImportRollbackMultiplePartitions(t *testing.T) { + t.Skip("test will fail on this branch, no cbgt update on here yet, CBG-4505") + if !base.IsEnterpriseEdition() { t.Skip("This test only works against EE") }