diff --git a/jvm-libs/linea/blob-compressor/src/main/kotlin/net/consensys/linea/blob/DraftBlobCompressor.kt b/jvm-libs/linea/blob-compressor/src/main/kotlin/net/consensys/linea/blob/DraftBlobCompressor.kt new file mode 100644 index 000000000..bdc780c64 --- /dev/null +++ b/jvm-libs/linea/blob-compressor/src/main/kotlin/net/consensys/linea/blob/DraftBlobCompressor.kt @@ -0,0 +1,165 @@ +package net.consensys.linea.blob + +import org.apache.logging.log4j.LogManager +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * Improved implementation of BlobCompressor that uses a draft-based approach + * instead of optimistic compression with revert. + */ +class DraftBlobCompressor private constructor( + private val goNativeBlobCompressor: GoNativeBlobCompressor, + private val dataLimit: UInt, + private val compressorVersion: BlobCompressorVersion +) : BlobCompressor { + + companion object { + private val log = LogManager.getLogger(DraftBlobCompressor::class.java) + private const val DRAFT_POOL_SIZE = 3 // Configurable pool size + + @Volatile + private var instance: DraftBlobCompressor? = null + + fun getInstance( + compressorVersion: BlobCompressorVersion = BlobCompressorVersion.V1_0_1, + dataLimit: UInt + ): DraftBlobCompressor { + if (instance == null) { + synchronized(this) { + if (instance == null) { + val mainCompressor = GoNativeBlobCompressorFactory.getInstance(compressorVersion) + + // Initialize main compressor + val dictionaryPath = GoNativeBlobCompressorFactory.dictionaryPath.toString() + if (!mainCompressor.Init(dataLimit.toInt(), dictionaryPath)) { + throw InstantiationException(mainCompressor.Error()) + } + + instance = DraftBlobCompressor(mainCompressor, dataLimit, compressorVersion) + } else { + throw IllegalStateException("Compressor singleton instance already created") + } + } + } else { + throw IllegalStateException("Compressor singleton instance already created") + } + return instance!! + } + } + + // Pool of draft compressors + private val draftPool = ConcurrentLinkedQueue() + + init { + // Initialize pool with draft compressors + val dictionaryPath = GoNativeBlobCompressorFactory.dictionaryPath.toString() + repeat(DRAFT_POOL_SIZE) { + val draft = GoNativeBlobCompressorFactory.getInstance(compressorVersion) + if (!draft.Init(dataLimit.toInt(), dictionaryPath)) { + throw InstantiationException(draft.Error()) + } + draftPool.offer(draft) + } + } + + private fun getDraftCompressor(): GoNativeBlobCompressor { + return draftPool.poll() ?: run { + // Create new draft compressor if pool is empty + val draft = GoNativeBlobCompressorFactory.getInstance(compressorVersion) + val dictionaryPath = GoNativeBlobCompressorFactory.dictionaryPath.toString() + if (!draft.Init(dataLimit.toInt(), dictionaryPath)) { + throw InstantiationException(draft.Error()) + } + draft + } + } + + private fun returnDraftCompressor(draft: GoNativeBlobCompressor) { + draft.Reset() + draftPool.offer(draft) + } + + override fun canAppendBlock(blockRLPEncoded: ByteArray): Boolean { + val draft = getDraftCompressor() + try { + draft.Reset() + draft.StartNewBatch() + + // Copy current state to draft + val currentState = ByteArray(goNativeBlobCompressor.Len()) + goNativeBlobCompressor.Bytes(currentState) + + // Try to append in draft + val canWrite = draft.CanWrite(blockRLPEncoded, blockRLPEncoded.size) + if (!canWrite) { + log.debug("Block cannot be appended: {}", draft.Error()) + } + return canWrite + } finally { + returnDraftCompressor(draft) + } + } + + override fun appendBlock(blockRLPEncoded: ByteArray): BlobCompressor.AppendResult { + val compressionSizeBefore = goNativeBlobCompressor.Len() + val draft = getDraftCompressor() + + try { + // Use draft for compression attempt + draft.Reset() + draft.StartNewBatch() + + // Copy current state to draft + val currentState = ByteArray(goNativeBlobCompressor.Len()) + goNativeBlobCompressor.Bytes(currentState) + + // Try compression in draft + val success = draft.Write(blockRLPEncoded, blockRLPEncoded.size) + val error = draft.Error() + + if (!success || error != null) { + log.error("Failed to compress block: {}", error) + throw BlobCompressionException(error ?: "Unknown compression error") + } + + // Get compressed size after successful compression + val compressedSizeAfter = draft.Len() + + // If compression was successful, copy draft state to main compressor + if (success) { + val compressedData = ByteArray(draft.Len()) + draft.Bytes(compressedData) + + goNativeBlobCompressor.Reset() + goNativeBlobCompressor.StartNewBatch() + goNativeBlobCompressor.Write(compressedData, compressedData.size) + } + + log.trace( + "Block compressed: blockRlpSize={} compressionDataBefore={} compressionDataAfter={} compressionRatio={}", + blockRLPEncoded.size, + compressionSizeBefore, + compressedSizeAfter, + 1.0 - ((compressedSizeAfter - compressionSizeBefore).toDouble() / blockRLPEncoded.size) + ) + + return BlobCompressor.AppendResult(success, compressionSizeBefore, compressedSizeAfter) + } finally { + returnDraftCompressor(draft) + } + } + + override fun startNewBatch() { + goNativeBlobCompressor.StartNewBatch() + } + + override fun getCompressedData(): ByteArray { + val compressedData = ByteArray(goNativeBlobCompressor.Len()) + goNativeBlobCompressor.Bytes(compressedData) + return compressedData + } + + override fun reset() { + goNativeBlobCompressor.Reset() + } +} diff --git a/jvm-libs/linea/blob-compressor/src/test/kotlin/net/consensys/linea/blob/DraftBlobCompressorTest.kt b/jvm-libs/linea/blob-compressor/src/test/kotlin/net/consensys/linea/blob/DraftBlobCompressorTest.kt new file mode 100644 index 000000000..3ba9ef846 --- /dev/null +++ b/jvm-libs/linea/blob-compressor/src/test/kotlin/net/consensys/linea/blob/DraftBlobCompressorTest.kt @@ -0,0 +1,144 @@ +package net.consensys.linea.blob + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import kotlin.random.Random +import kotlin.concurrent.thread + +class DraftBlobCompressorTest { + private lateinit var compressor: DraftBlobCompressor + private val dataLimit: UInt = 1024u * 1024u // 1MB + + @BeforeEach + fun setUp() { + compressor = DraftBlobCompressor.getInstance(BlobCompressorVersion.V1_0_1, dataLimit) + } + + @Test + fun `test successful block compression`() { + val testData = Random.nextBytes(1000) + + // Check if block can be appended + assertTrue(compressor.canAppendBlock(testData)) + + // Append block + val result = compressor.appendBlock(testData) + assertTrue(result.blockAppended) + assertTrue(result.compressedSizeAfter > result.compressedSizeBefore) + + // Verify compressed data + val compressedData = compressor.getCompressedData() + assertNotNull(compressedData) + assertTrue(compressedData.isNotEmpty()) + } + + @Test + fun `test compression with data exceeding limit`() { + val testData = Random.nextBytes(dataLimit.toInt() + 1000) + + // Check if block can be appended + assertFalse(compressor.canAppendBlock(testData)) + + // Try to append block + assertThrows { + compressor.appendBlock(testData) + } + } + + @Test + fun `test multiple block compression`() { + val block1 = Random.nextBytes(500) + val block2 = Random.nextBytes(500) + + // Append first block + val result1 = compressor.appendBlock(block1) + assertTrue(result1.blockAppended) + + // Start new batch + compressor.startNewBatch() + + // Append second block + val result2 = compressor.appendBlock(block2) + assertTrue(result2.blockAppended) + + // Verify final compressed data + val compressedData = compressor.getCompressedData() + assertTrue(compressedData.size > result1.compressedSizeAfter) + } + + @Test + fun `test reset functionality`() { + val testData = Random.nextBytes(1000) + + // Append block + val result = compressor.appendBlock(testData) + assertTrue(result.blockAppended) + + // Reset compressor + compressor.reset() + + // Verify state is cleared + val compressedData = compressor.getCompressedData() + assertEquals(0, compressedData.size) + } + + @Test + fun `test draft isolation`() { + val block1 = Random.nextBytes(500) + val block2 = Random.nextBytes(dataLimit.toInt()) // Block that won't fit + + // Append first block successfully + val result1 = compressor.appendBlock(block1) + assertTrue(result1.blockAppended) + val sizeAfterBlock1 = compressor.getCompressedData().size + + // Try to append second block (should fail) + assertFalse(compressor.canAppendBlock(block2)) + + // Verify original data is preserved + val finalData = compressor.getCompressedData() + assertEquals(sizeAfterBlock1, finalData.size) + } + + @Test + fun `test concurrent compression using draft pool`() { + val numThreads = 5 + val blockSize = 500 + val threads = List(numThreads) { threadId -> + thread { + repeat(3) { + val block = Random.nextBytes(blockSize) + assertTrue(compressor.canAppendBlock(block)) + val result = compressor.appendBlock(block) + assertTrue(result.blockAppended) + } + } + } + + // Wait for all threads to complete + threads.forEach { it.join() } + + // Verify final state + val finalData = compressor.getCompressedData() + assertTrue(finalData.isNotEmpty()) + } + + @Test + fun `test draft pool reuse`() { + val blocks = List(10) { Random.nextBytes(500) } + + // Process blocks sequentially to test pool reuse + blocks.forEach { block -> + assertTrue(compressor.canAppendBlock(block)) + val result = compressor.appendBlock(block) + assertTrue(result.blockAppended) + compressor.startNewBatch() + } + + // Verify final state + val finalData = compressor.getCompressedData() + assertTrue(finalData.isNotEmpty()) + } +}