From 9ecad5c38cc569b094043d67630f0211a4d083eb Mon Sep 17 00:00:00 2001 From: HashEngineering Date: Tue, 10 Dec 2024 07:45:51 -0800 Subject: [PATCH] feat: add SPVDirectIOBlockStore as an alternative to SPVBlockStore --- .../bitcoinj/store/SPVDirectIOBlockStore.java | 388 ++++++++++++++++++ .../bitcoinj/store/SPVBlockStoreV2Test.java | 175 ++++++++ 2 files changed, 563 insertions(+) create mode 100644 core/src/main/java/org/bitcoinj/store/SPVDirectIOBlockStore.java create mode 100644 core/src/test/java/org/bitcoinj/store/SPVBlockStoreV2Test.java diff --git a/core/src/main/java/org/bitcoinj/store/SPVDirectIOBlockStore.java b/core/src/main/java/org/bitcoinj/store/SPVDirectIOBlockStore.java new file mode 100644 index 000000000..24b6a9234 --- /dev/null +++ b/core/src/main/java/org/bitcoinj/store/SPVDirectIOBlockStore.java @@ -0,0 +1,388 @@ +package org.bitcoinj.store; + +import org.bitcoinj.core.*; +import org.bitcoinj.utils.*; +import org.slf4j.*; + +import javax.annotation.*; +import java.io.*; +import java.nio.*; +import java.nio.channels.*; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.locks.*; + +import static com.google.common.base.Preconditions.*; + +/** + * An SPVDirectIOBlockStore holds a limited number of block headers in a memory mapped ring buffer. + * With such a store, you may not be able to process very deep re-orgs and could be disconnected from the chain + * (requiring a replay), but as they are virtually unheard of this is not a significant risk. + * + * Unlike {@link SPVBlockStore} this class uses direct IO instead of mmap and will be slower. + */ +public class SPVDirectIOBlockStore implements BlockStore { + private static final Logger log = LoggerFactory.getLogger(SPVDirectIOBlockStore.class); + + /** The default number of headers that will be stored in the ring buffer. */ + public static final int DEFAULT_CAPACITY = 5000; + public static final String HEADER_MAGIC = "SPVB"; + + protected ByteBuffer buffer; + protected final NetworkParameters params; + + protected ReentrantLock lock = Threading.lock("SPVBlockStore"); + + // The entire ring-buffer is accessed through a FileChannel and ByteBuffer, avoiding MappedByteBuffer issues. + protected FileChannel fileChannel; + protected FileLock fileLock = null; + protected RandomAccessFile randomAccessFile; + private final int fileLength; + + protected LinkedHashMap blockCache = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry entry) { + return size() > 2050; // Slightly more than the difficulty transition period. + } + }; + + private static final Object NOT_FOUND_MARKER = new Object(); + protected LinkedHashMap notFoundCache = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry entry) { + return size() > 100; // This was chosen arbitrarily. + } + }; + + /** + * Creates and initializes an SPV block store that can hold {@link #DEFAULT_CAPACITY} block headers. Will create the + * given file if it's missing. This operation will block on disk. + * @param file file to use for the block store + * @throws BlockStoreException if something goes wrong + */ + public SPVDirectIOBlockStore(NetworkParameters params, File file) throws BlockStoreException { + this(params, file, DEFAULT_CAPACITY, false); + } + + /** + * Creates and initializes an SPV block store that can hold a given amount of blocks. Will create the given file if + * it's missing. This operation will block on disk. + * @param file file to use for the block store + * @param capacity custom capacity in number of block headers + * @throws BlockStoreException if something goes wrong + */ + public SPVDirectIOBlockStore(NetworkParameters params, File file, int capacity, boolean grow) throws BlockStoreException { + checkNotNull(file); + this.params = checkNotNull(params); + checkArgument(capacity > 0); + try { + boolean exists = file.exists(); + // Set up the backing file. + randomAccessFile = new RandomAccessFile(file, "rw"); + fileLength = getFileSize(capacity); + + if (!exists) { + log.info("Creating new SPV block chain file " + file); + randomAccessFile.setLength(fileLength); + } else if (randomAccessFile.length() != fileLength) { + final long currentLength = randomAccessFile.length(); + if (currentLength != fileLength) { + if ((currentLength - FILE_PROLOGUE_BYTES) % RECORD_SIZE != 0) + throw new BlockStoreException( + "File size on disk indicates this is not a block store: " + currentLength); + else if (!grow) + throw new BlockStoreException("File size on disk does not match expected size: " + currentLength + + " vs " + fileLength); + else if (fileLength < randomAccessFile.length()) + throw new BlockStoreException( + "Shrinking is unsupported: " + currentLength + " vs " + fileLength); + else + randomAccessFile.setLength(fileLength); + } + } + fileChannel = randomAccessFile.getChannel(); + fileLock = fileChannel.tryLock(); + if (fileLock == null) + throw new ChainFileLockedException("Store file is already locked by another process"); + + buffer = ByteBuffer.allocateDirect(fileLength); + int bytesRead = fileChannel.read(buffer); + if (bytesRead > 0) { + buffer.flip(); // Only flip if data was read + //log.info("Buffer state before flip: position=" + buffer.position() + ", limit=" + buffer.limit()); + } else { + buffer.clear(); // Clear if no data + //log.info("Buffer state before flip: position=" + buffer.position() + ", limit=" + buffer.limit()); + } + + if (!exists) { + initNewStore(params); + } else { + byte[] header = new byte[4]; + buffer.get(header); + if (!new String(header, StandardCharsets.US_ASCII).equals(HEADER_MAGIC)) { + throw new BlockStoreException("Invalid file header"); + } + } + } catch (Exception e) { + try { + if (randomAccessFile != null) randomAccessFile.close(); + } catch (IOException e2) { + throw new BlockStoreException(e2); + } + throw new BlockStoreException(e); + } + } + + private void initNewStore(NetworkParameters params) throws IOException, BlockStoreException { + byte[] header = HEADER_MAGIC.getBytes(StandardCharsets.US_ASCII); + buffer.put(header); + + lock.lock(); + try { + setRingCursor(buffer, FILE_PROLOGUE_BYTES); + } finally { + lock.unlock(); + } + Block genesis = params.getGenesisBlock().cloneAsHeader(); + StoredBlock storedGenesis = new StoredBlock(genesis, genesis.getWork(), 0); + put(storedGenesis); + setChainHead(storedGenesis); + flushBuffer(); + } + + /** Returns the size in bytes of the file that is used to store the chain with the current parameters. */ + public static final int getFileSize(int capacity) { + return RECORD_SIZE * capacity + FILE_PROLOGUE_BYTES /* extra kilobyte for stuff */; + } + + @Override + public void put(StoredBlock block) throws BlockStoreException { + final ByteBuffer buffer = this.buffer; + if (buffer == null) throw new BlockStoreException("Store closed"); + + lock.lock(); + try { + int cursor = getRingCursor(buffer); + if (cursor == buffer.capacity()) { + cursor = FILE_PROLOGUE_BYTES; + } + buffer.position(cursor); + Sha256Hash hash = block.getHeader().getHash(); + notFoundCache.remove(hash); + buffer.put(hash.getBytes()); + block.serializeCompact(buffer); + setRingCursor(buffer, buffer.position()); + blockCache.put(hash, block); + flushBuffer(); + } catch (IOException e) { + throw new BlockStoreException(e); + } finally { + lock.unlock(); + } + } + + @Override + @Nullable + public StoredBlock get(Sha256Hash hash) throws BlockStoreException { + final ByteBuffer buffer = this.buffer; + if (buffer == null) throw new BlockStoreException("Store closed"); + + lock.lock(); + try { + StoredBlock cacheHit = blockCache.get(hash); + if (cacheHit != null) + return cacheHit; + if (notFoundCache.get(hash) != null) + return null; + + int cursor = getRingCursor(buffer); + final int startingPoint = cursor; + final byte[] targetHashBytes = hash.getBytes(); + byte[] scratch = new byte[32]; + do { + cursor -= RECORD_SIZE; + if (cursor < FILE_PROLOGUE_BYTES) { + // We hit the start, so wrap around. + cursor = buffer.capacity() - RECORD_SIZE; + } + // Cursor is now at the start of the next record to check, so read the hash and compare it. + buffer.position(cursor); + buffer.get(scratch); + if (Arrays.equals(scratch, targetHashBytes)) { + StoredBlock storedBlock = StoredBlock.deserializeCompact(params, buffer); + blockCache.put(hash, storedBlock); + return storedBlock; + } + } while (cursor != startingPoint); + // Not found. + notFoundCache.put(hash, NOT_FOUND_MARKER); + return null; + } catch (ProtocolException e) { + throw new BlockStoreException(e); + } finally { + lock.unlock(); + } + } + + protected StoredBlock lastChainHead = null; + + @Override + public StoredBlock getChainHead() throws BlockStoreException { + final ByteBuffer buffer = this.buffer; + if (buffer == null) throw new BlockStoreException("Store closed"); + + lock.lock(); + try { + if (lastChainHead == null) { + byte[] headHash = new byte[32]; + buffer.position(8); + buffer.get(headHash); + Sha256Hash hash = Sha256Hash.wrap(headHash); + StoredBlock block = get(hash); + if (block == null) + throw new BlockStoreException("Corrupted block store: could not find chain head: " + hash); + lastChainHead = block; + } + return lastChainHead; + } finally { + lock.unlock(); + } + } + + @Override + public void setChainHead(StoredBlock chainHead) throws BlockStoreException { + if (buffer == null) throw new BlockStoreException("Store closed"); + + lock.lock(); + try { + lastChainHead = chainHead; + byte[] headHash = chainHead.getHeader().getHash().getBytes(); + buffer.position(8); + buffer.put(headHash); + flushBuffer(); + } catch (IOException e) { + throw new BlockStoreException(e); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws BlockStoreException { + try { + flushBuffer(); + fileChannel.close(); + randomAccessFile.close(); + blockCache.clear(); + } catch (IOException e) { + throw new BlockStoreException(e); + } + } + + @Override + public NetworkParameters getParams() { + return params; + } + + protected static final int RECORD_SIZE = 32 /* hash */ + StoredBlock.COMPACT_SERIALIZED_SIZE; + + // File format: + // 4 header bytes = "SPVB" + // 4 cursor bytes, which indicate the offset from the first kb where the next block header should be written. + // 32 bytes for the hash of the chain head + // + // For each header (128 bytes) + // 32 bytes hash of the header + // 12 bytes of chain work + // 4 bytes of height + // 80 bytes of block header data + protected static final int FILE_PROLOGUE_BYTES = 1024; + + private int getRingCursor(ByteBuffer buffer) { + if (buffer.limit() < 8) { + throw new IllegalStateException("Buffer does not contain enough data to read ring cursor."); + } + int c = buffer.getInt(4); + checkState(c >= FILE_PROLOGUE_BYTES, "Integer overflow"); + return c; + } + + private void setRingCursor(ByteBuffer buffer, int newCursor) { + checkArgument(newCursor >= 0); + buffer.putInt(4, newCursor); + } + + private void flushBuffer() throws IOException { + if (buffer.position() > 0) { // Only flip if there's data to write + buffer.flip(); + fileChannel.position(0); + fileChannel.write(buffer); + buffer.compact(); // Preserve any remaining data + } + } + + @Nullable + public StoredBlock get(int blockHeight) throws BlockStoreException { + + lock.lock(); + try { + StoredBlock cursor = getChainHead(); + + if(cursor.getHeight() < blockHeight) + return null; + + + while (cursor != null) { + if(cursor.getHeight() == blockHeight) + return cursor; + + cursor = get(cursor.getHeader().getPrevBlockHash()); + } + + return null; + } finally { lock.unlock(); } + } + + @Override + public StoredBlock getChainHeadFromHash(Sha256Hash hash) throws BlockStoreException { + + StoredBlock cursor = get(hash); + StoredBlock current = cursor; + while (cursor != null) { + cursor = getNextBlock(cursor.getHeader().getHash()); + if (cursor == null) + return current; + current = cursor; + } + + return null; + } + + private StoredBlock getNextBlock(Sha256Hash hash) { + for (Map.Entry entry : blockCache.entrySet()) { + if (entry.getValue().getHeader().getPrevBlockHash().equals(hash)) + return entry.getValue(); + } + return null; + } + + public void clear() throws Exception { + lock.lock(); + try { + // Clear caches + blockCache.clear(); + notFoundCache.clear(); + // Clear file content + buffer.position(0); + long fileLength = randomAccessFile.length(); + for (int i = 0; i < fileLength; i++) { + buffer.put((byte)0); + } + // Initialize store again + buffer.position(0); + initNewStore(params); + } finally { lock.unlock(); } + } +} + diff --git a/core/src/test/java/org/bitcoinj/store/SPVBlockStoreV2Test.java b/core/src/test/java/org/bitcoinj/store/SPVBlockStoreV2Test.java new file mode 100644 index 000000000..707f1e494 --- /dev/null +++ b/core/src/test/java/org/bitcoinj/store/SPVBlockStoreV2Test.java @@ -0,0 +1,175 @@ +/* + * Copyright 2013 Google Inc. + * Copyright 2018 Andreas Schildbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.bitcoinj.store; + +import com.google.common.base.Stopwatch; +import org.bitcoinj.core.Address; +import org.bitcoinj.core.Block; +import org.bitcoinj.core.ECKey; +import org.bitcoinj.core.NetworkParameters; +import org.bitcoinj.core.Sha256Hash; +import org.bitcoinj.core.StoredBlock; +import org.bitcoinj.core.Transaction; +import org.bitcoinj.core.Utils; +import org.bitcoinj.params.UnitTestParams; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.math.BigInteger; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class SPVBlockStoreV2Test { + private static NetworkParameters UNITTEST; + private File blockStoreFile; + + @BeforeClass + public static void setUpClass() throws Exception { + Utils.resetMocking(); + UNITTEST = UnitTestParams.get(); + } + + @Before + public void setup() throws Exception { + blockStoreFile = File.createTempFile("SPVBlockStoreV2", null); + blockStoreFile.delete(); + blockStoreFile.deleteOnExit(); + } + + @Test + public void basics() throws Exception { + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + + Address to = Address.fromKey(UNITTEST, new ECKey()); + // Check the first block in a new store is the genesis block. + StoredBlock genesis = store.getChainHead(); + assertEquals(UNITTEST.getGenesisBlock(), genesis.getHeader()); + assertEquals(0, genesis.getHeight()); + + // Build a new block. + StoredBlock b1 = genesis.build(genesis.getHeader().createNextBlock(to).cloneAsHeader()); + store.put(b1); + store.setChainHead(b1); + store.close(); + + // Check we can get it back out again if we rebuild the store object. + store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + StoredBlock b2 = store.get(b1.getHeader().getHash()); + assertEquals(b1, b2); + // Check the chain head was stored correctly also. + StoredBlock chainHead = store.getChainHead(); + assertEquals(b1, chainHead); + store.close(); + } + + @Test(expected = BlockStoreException.class) + public void twoStores_onSameFile() throws Exception { + new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + } + + @Test + public void twoStores_butSequentially() throws Exception { + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + store.close(); + store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + } + + @Test(expected = BlockStoreException.class) + public void twoStores_sequentially_butMismatchingCapacity() throws Exception { + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile, 10, false); + store.close(); + store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile, 20, false); + } + + @Test + public void twoStores_sequentially_grow() throws Exception { + Address to = Address.fromKey(UNITTEST, new ECKey()); + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile, 10, true); + final StoredBlock block0 = store.getChainHead(); + final StoredBlock block1 = block0.build(block0.getHeader().createNextBlock(to).cloneAsHeader()); + store.put(block1); + final StoredBlock block2 = block1.build(block1.getHeader().createNextBlock(to).cloneAsHeader()); + store.put(block2); + store.setChainHead(block2); + store.close(); + + store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile, 20, true); + final StoredBlock read2 = store.getChainHead(); + assertEquals(block2, read2); + final StoredBlock read1 = read2.getPrev(store); + assertEquals(block1, read1); + final StoredBlock read0 = read1.getPrev(store); + assertEquals(block0, read0); + store.close(); + assertEquals(SPVDirectIOBlockStore.getFileSize(20), blockStoreFile.length()); + } + + @Test(expected = BlockStoreException.class) + public void twoStores_sequentially_shrink() throws Exception { + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile, 20, true); + store.close(); + store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile, 10, true); + } + + @Test + @Ignore + public void performanceTest() throws BlockStoreException { + // On slow machines, this test could fail. Then either add @Ignore or adapt the threshold and please report to + // us. + final int ITERATIONS = 100000; + final long THRESHOLD_MS = 2000; + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + Stopwatch watch = Stopwatch.createStarted(); + for (int i = 0; i < ITERATIONS; i++) { + // Using i as the nonce so that the block hashes are different. + Block block = new Block(UNITTEST, 0, Sha256Hash.ZERO_HASH, Sha256Hash.ZERO_HASH, 0, 0, i, + Collections. emptyList()); + StoredBlock b = new StoredBlock(block, BigInteger.ZERO, i); + store.put(b); + store.setChainHead(b); + } + assertTrue("took " + watch + " for " + ITERATIONS + " iterations", + watch.elapsed(TimeUnit.MILLISECONDS) < THRESHOLD_MS); + store.close(); + } + + @Test + public void clear() throws Exception { + SPVDirectIOBlockStore store = new SPVDirectIOBlockStore(UNITTEST, blockStoreFile); + + // Build a new block. + Address to = Address.fromKey(UNITTEST, new ECKey()); + StoredBlock genesis = store.getChainHead(); + StoredBlock b1 = genesis.build(genesis.getHeader().createNextBlock(to).cloneAsHeader()); + store.put(b1); + store.setChainHead(b1); + assertEquals(b1.getHeader().getHash(), store.getChainHead().getHeader().getHash()); + store.clear(); + assertNull(store.get(b1.getHeader().getHash())); + assertEquals(UNITTEST.getGenesisBlock().getHash(), store.getChainHead().getHeader().getHash()); + store.close(); + } +}