Skip to content

Commit

Permalink
blockstore: separate write and prune batches.
Browse files Browse the repository at this point in the history
ChainDB and BlockStore are separate and committing to them is not really atomic,
so we need to make sure the order operations can handle failure at any step.
Previously blockstore writes and prunes would happen after the chaindb was
written, but that could lead to the situation where the blockstore did not
actually have a block but chaindb had the information.
  This PR separates write and prune batches for the blockstore, so we can try
writing to blockstore first. In case writing to blockstore fails whole operation
will get aborted. Nothing gets written into the chaindb, so the information is
consistent. In case blockstore writes a block and then chaindb write fails,
we just get extra data in the blockstore. But most of the time, the same block
write will happen again (either main or alt chains).
  Pruning always happens after the chaindb was updated to avoid situation, where
chaindb thinks the data is stored, but blockstore has removed it. If blockstore
fails to prune after chaindb removed the information, worst case scenario
(prune mode) we get a maxFileLength(128 MB by default) space wasted.
  • Loading branch information
nodech committed Feb 8, 2022
1 parent 34a3b00 commit 86df238
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 65 deletions.
8 changes: 6 additions & 2 deletions lib/blockchain/chaindb.js
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,16 @@ class ChainDB {
assert(this.pending);

try {
if (this.blocks)
await this.blocksBatch.commitWrites();

await this.current.write();
} catch (e) {
this.current = null;
this.pending = null;
this.cacheHash.drop();
this.cacheHeight.drop();
this.blocksBatch = null;
throw e;
}

Expand All @@ -329,7 +333,7 @@ class ChainDB {
this.stateCache.commit();

if (this.blocks)
await this.blocksBatch.write();
await this.blocksBatch.commitPrunes();
}

/**
Expand Down Expand Up @@ -918,7 +922,7 @@ class ChainDB {
// We do blockstore write first, because if something
// fails during this batch, then db flag wont be set.
// If user just reruns the node prune will restart.
await blocksBatch.write();
await blocksBatch.commit();

try {
options.prune = true;
Expand Down
76 changes: 49 additions & 27 deletions lib/blockstore/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,14 @@ class FileBatch {

constructor(blocks) {
this.blocks = blocks;
this.ops = [];
this.written = false;
this.writes = [];
this.prunes = [];
this.committedWrites = false;
this.committedPrunes = false;
}

get written() {
return this.committedWrites && this.committedPrunes;
}

/**
Expand All @@ -676,7 +682,7 @@ class FileBatch {
*/

writeMerkle(hash, data) {
this.ops.push(new WriteOp(types.MERKLE, hash, data));
this.writes.push(new WriteOp(types.MERKLE, hash, data));
}

/**
Expand All @@ -687,7 +693,7 @@ class FileBatch {
*/

writeUndo(hash, data) {
this.ops.push(new WriteOp(types.UNDO, hash, data));
this.writes.push(new WriteOp(types.UNDO, hash, data));
}

/**
Expand All @@ -698,7 +704,7 @@ class FileBatch {
*/

writeBlock(hash, data) {
this.ops.push(new WriteOp(types.BLOCK, hash, data));
this.writes.push(new WriteOp(types.BLOCK, hash, data));
}

/**
Expand All @@ -708,7 +714,7 @@ class FileBatch {
*/

pruneMerkle(hash) {
this.ops.push(new PruneOp(types.MERKLE, hash));
this.prunes.push(new PruneOp(types.MERKLE, hash));
}

/**
Expand All @@ -718,7 +724,7 @@ class FileBatch {
*/

pruneUndo(hash) {
this.ops.push(new PruneOp(types.UNDO, hash));
this.prunes.push(new PruneOp(types.UNDO, hash));
}

/**
Expand All @@ -728,7 +734,7 @@ class FileBatch {
*/

pruneBlock(hash) {
this.ops.push(new PruneOp(types.BLOCK, hash));
this.prunes.push(new PruneOp(types.BLOCK, hash));
}

/**
Expand All @@ -737,33 +743,49 @@ class FileBatch {
*/

clear() {
assert(!this.written, 'Already written.');
this.ops.length = 0;
assert(!this.written, 'Already written all.');
this.writes.length = 0;
this.prunes.length = 0;
}

/**
* Write change to the store.
* Commit only writes.
* @returns {Promise}
*/

async write() {
assert(!this.written, 'Already written.');
async commitWrites() {
assert(!this.committedWrites, 'Already written writes.');

for (const op of this.ops) {
switch(op.type) {
case WRITE:
await this.blocks._write(op.writeType, op.hash, op.data);
break;
case PRUNE:
await this.blocks._prune(op.pruneType, op.hash);
break;
default:
throw new Error('Bad Op.');
}
}
for (const op of this.writes)
await this.blocks._write(op.writeType, op.hash, op.data);

this.committedWrites = true;
}

/**
* Commit only prunes.
* @returns {Promise}
*/

async commitPrunes() {
assert(!this.committedPrunes, 'Already written prunes.');

for (const op of this.prunes)
await this.blocks._prune(op.pruneType, op.hash);

this.committedPrunes = true;
}

/**
* Commit both.
* @returns {Promise}
*/

async commit() {
assert(!this.written, 'Already written all.');

this.ops.length = 0;
this.written = true;
await this.commitWrites();
await this.commitPrunes();
}
}

Expand Down
52 changes: 37 additions & 15 deletions lib/blockstore/level.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,14 @@ class LevelBatch {
*/

constructor(db) {
this.batch = db.batch();
this.written = false;
this.writesBatch = db.batch();
this.prunesBatch = db.batch();
this.committedWrites = false;
this.committedPrunes = false;
}

get written() {
return this.committedPrunes && this.committedWrites;
}

/**
Expand All @@ -265,7 +271,7 @@ class LevelBatch {
*/

writeMerkle(hash, data) {
this.batch.put(layout.b.encode(types.MERKLE, hash), data);
this.writesBatch.put(layout.b.encode(types.MERKLE, hash), data);
return this;
}

Expand All @@ -277,7 +283,7 @@ class LevelBatch {
*/

writeUndo(hash, data) {
this.batch.put(layout.b.encode(types.UNDO, hash), data);
this.writesBatch.put(layout.b.encode(types.UNDO, hash), data);
return this;
}

Expand All @@ -289,7 +295,7 @@ class LevelBatch {
*/

writeBlock(hash, data) {
this.batch.put(layout.b.encode(types.BLOCK, hash), data);
this.writesBatch.put(layout.b.encode(types.BLOCK, hash), data);
return this;
}

Expand All @@ -300,7 +306,7 @@ class LevelBatch {
*/

pruneMerkle(hash) {
this.batch.del(layout.b.encode(types.MERKLE, hash));
this.prunesBatch.del(layout.b.encode(types.MERKLE, hash));
return this;
}

Expand All @@ -311,7 +317,7 @@ class LevelBatch {
*/

pruneUndo(hash) {
this.batch.del(layout.b.encode(types.UNDO, hash));
this.prunesBatch.del(layout.b.encode(types.UNDO, hash));
return this;
}

Expand All @@ -322,7 +328,7 @@ class LevelBatch {
*/

pruneBlock(hash) {
this.batch.del(layout.b.encode(types.BLOCK, hash));
this.prunesBatch.del(layout.b.encode(types.BLOCK, hash));
return this;
}

Expand All @@ -332,22 +338,38 @@ class LevelBatch {
*/

clear() {
assert(!this.written, 'Already written.');
this.batch.clear();
assert(!this.written, 'Already written all.');
this.writesBatch.clear();
this.prunesBatch.clear();
return this;
}

async commitWrites() {
assert(!this.committedWrites, 'Already written writes.');

await this.writesBatch.write();

this.committedWrites = true;
}

async commitPrunes() {
assert(!this.committedPrunes, 'Already written prunes.');

await this.prunesBatch.write();

this.committedPrunes = true;
}

/**
* Write change to the store.
* @returns {Promise}
*/

async write() {
assert(!this.written, 'Already written.');

await this.batch.write();
async commit() {
assert(!this.written, 'Already written all.');

this.written = true;
await this.commitWrites();
await this.commitPrunes();
}
}

Expand Down
Loading

0 comments on commit 86df238

Please sign in to comment.