From 3579de310a93160207f4e9d16163a969387508fb Mon Sep 17 00:00:00 2001 From: Erwan Guyader Date: Tue, 10 Dec 2024 16:39:07 +0100 Subject: [PATCH] fix: Break down large PouchDB.bulkDocs calls Large `bulkDocs` calls (i.e. with thousands of documents to save at once) can be the source of crashes. We're not sure if this comes from PouchDB or the logger calls with each document but we've figured that creating smaller batches of documents, logging them all at once and then saving the batch in PouchDB prevents crashes. It should also result in less I/O since we're making a lot less logger calls. To accommodate for these logging changes, we update our `jq` `path` filter so it looks into batches for documents with the expected path. --- .jq | 20 ++++++++--- core/pouch/index.js | 71 +++++++++++++++++++++++++++++----------- test/unit/pouch/index.js | 50 +++++++++++++++++++++++++++- 3 files changed, 116 insertions(+), 25 deletions(-) diff --git a/.jq b/.jq index 4d68a7765..d1683f6e7 100644 --- a/.jq +++ b/.jq @@ -145,12 +145,24 @@ def issues: clean | select(is_issue); def path(pattern): clean | select( ( - try (.path // (.doc | .path) // (.change | .doc | .path) // (.idConflict | .newDoc | .path) // (.event | .path)), - try (.oldpath // (.was | .path) // (.idConflict | .existingDoc | .path) // (.event | .oldPath)), + try ( + .path // + (.doc | .path) // + (.idConflict | .newDoc | .path) // + (.event | .path) // + (.docs | arrays | .[].path) // + (.batch | arrays | .[].path) + ), + try ( + .oldpath // + (.was | .path) // + (.idConflict | .existingDoc | .path) // + (.event | .oldPath) + ), "" ) - | strings - | test(pattern) + | strings + | test(pattern) ); def shortPath: diff --git a/core/pouch/index.js b/core/pouch/index.js index 794101806..d1d8fbdc4 100644 --- a/core/pouch/index.js +++ b/core/pouch/index.js @@ -29,6 +29,8 @@ const log = logger({ component: 'Pouch' }) +const POUCHDB_BATCH_SIZE = 1000 + // Pouchdb is used to store all the metadata about files and folders. // These metadata can come from the local filesystem or the remote cozy instance. // @@ -193,15 +195,23 @@ class Pouch { // This method will completely erase an array of records from PouchDB while // removing all their attributes. // This method also does not care about invariants like `bulkDocs()` does. - async eraseDocuments(docs /*: Array */) { - const docsToErase = [] - docs.forEach(doc => { - const { path, _id, _rev } = _.clone(doc) - log.info('Erasing bulk record...', { path, _id, _rev }) - docsToErase.push({ _id, _rev, _deleted: true }) - }) + async eraseDocuments( + docs /*: $ReadOnlyArray */ + ) /*: Promise> */ { + let results = [] + for (const batch of createBatches(docs, POUCHDB_BATCH_SIZE)) { + const batchResults = await this._eraseDocuments( + batch.map(({ _id, _rev }) => ({ _id, _rev, _deleted: true })) + ) + results = results.concat(batchResults) + } + return results + } - const results = await this.db.bulkDocs(docsToErase) + async _eraseDocuments(docs /*: $ReadOnlyArray */) { + log.info('Erasing bulk records...', { docs }) + + const results = await this.db.bulkDocs(docs) for (let [idx, result] of results.entries()) { if (result.error) { const err = new PouchError(result) @@ -221,19 +231,26 @@ class Pouch { // WARNING: bulkDocs is not a transaction, some updates can be applied while // others do not. // Make sure lock is acquired before using it to avoid conflict. - async bulkDocs /*:: */(docs /*: Array */) { + async bulkDocs( + docs /*: $ReadOnlyArray */ + ) /*: Promise> */ { + let results = [] + for (const batch of createBatches(docs, POUCHDB_BATCH_SIZE)) { + const batchResults = await this._bulkDocs(batch) + results = results.concat(batchResults) + } + + return results + } + + // WARNING: _bulkDocs is not a transaction, some updates can be applied while + // others do not. + // Make sure lock is acquired before using it to avoid conflict. + async _bulkDocs(docs /*: $ReadOnlyArray */) { + log.info('Saving bulk metadata...', { docs }) + for (const doc of docs) { metadata.invariants(doc) - const { path, _id } = doc - const { local, remote } = doc.sides || {} - log.info('Saving bulk metadata...', { - path, - _id, - local, - remote, - _deleted: doc._deleted, - doc - }) } const results = await this.db.bulkDocs(docs) for (let [idx, result] of results.entries()) { @@ -680,4 +697,18 @@ const sortByPath = (docA /*: SavedMetadata */, docB /*: SavedMetadata */) => { return 0 } -module.exports = { Pouch, byPathKey, sortByPath } +const createBatches = /*:: */ ( + docs /*: $ReadOnlyArray */, + batchSize /*: number */ +) /*: Array> */ => { + if (batchSize <= 0) return [[...docs]] + + let batches /*: Array> */ = [] + for (let i = 0; i < docs.length / batchSize; i++) { + const batch = docs.slice(i * batchSize, (i + 1) * batchSize) + batches.push(batch) + } + return batches +} + +module.exports = { Pouch, byPathKey, sortByPath, createBatches } diff --git a/test/unit/pouch/index.js b/test/unit/pouch/index.js index b688e6432..84efaf749 100644 --- a/test/unit/pouch/index.js +++ b/test/unit/pouch/index.js @@ -9,7 +9,7 @@ const _ = require('lodash') const { REV_CONFLICT } = require('pouchdb') const metadata = require('../../../core/metadata') -const { sortByPath } = require('../../../core/pouch') +const { sortByPath, createBatches } = require('../../../core/pouch') const Builders = require('../../support/builders') const configHelpers = require('../../support/helpers/config') @@ -726,3 +726,51 @@ describe('Pouch', function () { }) }) }) + +describe('createBatches', () => { + it('creates batches of at most the given size from the given documents', () => { + const builders = new Builders() + + const docs = [ + builders.metafile().build(), + builders.metafile().build(), + builders.metafile().build(), + builders.metafile().build(), + builders.metafile().build() + ] + + should(createBatches(docs, 1)).deepEqual([ + [docs[0]], + [docs[1]], + [docs[2]], + [docs[3]], + [docs[4]] + ]) + + should(createBatches(docs, 2)).deepEqual([ + [docs[0], docs[1]], + [docs[2], docs[3]], + [docs[4]] + ]) + + should(createBatches(docs, 3)).deepEqual([ + [docs[0], docs[1], docs[2]], + [docs[3], docs[4]] + ]) + + should(createBatches(docs, 5)).deepEqual([ + [docs[0], docs[1], docs[2], docs[3], docs[4]] + ]) + + should(createBatches(docs, 6)).deepEqual([ + [docs[0], docs[1], docs[2], docs[3], docs[4]] + ]) + + should(createBatches(docs, 0)).deepEqual([ + [docs[0], docs[1], docs[2], docs[3], docs[4]] + ]) + should(createBatches(docs, -1)).deepEqual([ + [docs[0], docs[1], docs[2], docs[3], docs[4]] + ]) + }) +})