Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove reverts from compressor using draft pool approach #532

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package net.consensys.linea.blob

import org.apache.logging.log4j.LogManager
import java.util.concurrent.ConcurrentLinkedQueue

/**
* Improved implementation of BlobCompressor that uses a draft-based approach
* instead of optimistic compression with revert.
*/
class DraftBlobCompressor private constructor(
private val goNativeBlobCompressor: GoNativeBlobCompressor,
private val dataLimit: UInt,
private val compressorVersion: BlobCompressorVersion
) : BlobCompressor {

companion object {
private val log = LogManager.getLogger(DraftBlobCompressor::class.java)
private const val DRAFT_POOL_SIZE = 3 // Configurable pool size

@Volatile
private var instance: DraftBlobCompressor? = null

fun getInstance(
compressorVersion: BlobCompressorVersion = BlobCompressorVersion.V1_0_1,
dataLimit: UInt
): DraftBlobCompressor {
if (instance == null) {
synchronized(this) {
if (instance == null) {
val mainCompressor = GoNativeBlobCompressorFactory.getInstance(compressorVersion)

// Initialize main compressor
val dictionaryPath = GoNativeBlobCompressorFactory.dictionaryPath.toString()
if (!mainCompressor.Init(dataLimit.toInt(), dictionaryPath)) {
throw InstantiationException(mainCompressor.Error())
}

instance = DraftBlobCompressor(mainCompressor, dataLimit, compressorVersion)
} else {
throw IllegalStateException("Compressor singleton instance already created")
}
}
} else {
throw IllegalStateException("Compressor singleton instance already created")
}
return instance!!
}
}

// Pool of draft compressors
private val draftPool = ConcurrentLinkedQueue<GoNativeBlobCompressor>()

init {
// Initialize pool with draft compressors
val dictionaryPath = GoNativeBlobCompressorFactory.dictionaryPath.toString()
repeat(DRAFT_POOL_SIZE) {
val draft = GoNativeBlobCompressorFactory.getInstance(compressorVersion)
if (!draft.Init(dataLimit.toInt(), dictionaryPath)) {
throw InstantiationException(draft.Error())
}
draftPool.offer(draft)
}
}

private fun getDraftCompressor(): GoNativeBlobCompressor {
return draftPool.poll() ?: run {
// Create new draft compressor if pool is empty
val draft = GoNativeBlobCompressorFactory.getInstance(compressorVersion)
val dictionaryPath = GoNativeBlobCompressorFactory.dictionaryPath.toString()
if (!draft.Init(dataLimit.toInt(), dictionaryPath)) {
throw InstantiationException(draft.Error())
}
draft
}
}

private fun returnDraftCompressor(draft: GoNativeBlobCompressor) {
draft.Reset()
draftPool.offer(draft)
}

override fun canAppendBlock(blockRLPEncoded: ByteArray): Boolean {
val draft = getDraftCompressor()
try {
draft.Reset()
draft.StartNewBatch()

// Copy current state to draft
val currentState = ByteArray(goNativeBlobCompressor.Len())
goNativeBlobCompressor.Bytes(currentState)

// Try to append in draft
val canWrite = draft.CanWrite(blockRLPEncoded, blockRLPEncoded.size)
if (!canWrite) {
log.debug("Block cannot be appended: {}", draft.Error())
}
return canWrite
} finally {
returnDraftCompressor(draft)
}
}

override fun appendBlock(blockRLPEncoded: ByteArray): BlobCompressor.AppendResult {
val compressionSizeBefore = goNativeBlobCompressor.Len()
val draft = getDraftCompressor()

try {
// Use draft for compression attempt
draft.Reset()
draft.StartNewBatch()

// Copy current state to draft
val currentState = ByteArray(goNativeBlobCompressor.Len())
goNativeBlobCompressor.Bytes(currentState)

// Try compression in draft
val success = draft.Write(blockRLPEncoded, blockRLPEncoded.size)
val error = draft.Error()

if (!success || error != null) {
log.error("Failed to compress block: {}", error)
throw BlobCompressionException(error ?: "Unknown compression error")
}

// Get compressed size after successful compression
val compressedSizeAfter = draft.Len()

// If compression was successful, copy draft state to main compressor
if (success) {
val compressedData = ByteArray(draft.Len())
draft.Bytes(compressedData)

goNativeBlobCompressor.Reset()
goNativeBlobCompressor.StartNewBatch()
goNativeBlobCompressor.Write(compressedData, compressedData.size)
}

log.trace(
"Block compressed: blockRlpSize={} compressionDataBefore={} compressionDataAfter={} compressionRatio={}",
blockRLPEncoded.size,
compressionSizeBefore,
compressedSizeAfter,
1.0 - ((compressedSizeAfter - compressionSizeBefore).toDouble() / blockRLPEncoded.size)
)

return BlobCompressor.AppendResult(success, compressionSizeBefore, compressedSizeAfter)
} finally {
returnDraftCompressor(draft)
}
}

override fun startNewBatch() {
goNativeBlobCompressor.StartNewBatch()
}

override fun getCompressedData(): ByteArray {
val compressedData = ByteArray(goNativeBlobCompressor.Len())
goNativeBlobCompressor.Bytes(compressedData)
return compressedData
}

override fun reset() {
goNativeBlobCompressor.Reset()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package net.consensys.linea.blob

import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import kotlin.random.Random
import kotlin.concurrent.thread

class DraftBlobCompressorTest {
private lateinit var compressor: DraftBlobCompressor
private val dataLimit: UInt = 1024u * 1024u // 1MB

@BeforeEach
fun setUp() {
compressor = DraftBlobCompressor.getInstance(BlobCompressorVersion.V1_0_1, dataLimit)
}

@Test
fun `test successful block compression`() {
val testData = Random.nextBytes(1000)

// Check if block can be appended
assertTrue(compressor.canAppendBlock(testData))

// Append block
val result = compressor.appendBlock(testData)
assertTrue(result.blockAppended)
assertTrue(result.compressedSizeAfter > result.compressedSizeBefore)

// Verify compressed data
val compressedData = compressor.getCompressedData()
assertNotNull(compressedData)
assertTrue(compressedData.isNotEmpty())
}

@Test
fun `test compression with data exceeding limit`() {
val testData = Random.nextBytes(dataLimit.toInt() + 1000)

// Check if block can be appended
assertFalse(compressor.canAppendBlock(testData))

// Try to append block
assertThrows<BlobCompressionException> {
compressor.appendBlock(testData)
}
}

@Test
fun `test multiple block compression`() {
val block1 = Random.nextBytes(500)
val block2 = Random.nextBytes(500)

// Append first block
val result1 = compressor.appendBlock(block1)
assertTrue(result1.blockAppended)

// Start new batch
compressor.startNewBatch()

// Append second block
val result2 = compressor.appendBlock(block2)
assertTrue(result2.blockAppended)

// Verify final compressed data
val compressedData = compressor.getCompressedData()
assertTrue(compressedData.size > result1.compressedSizeAfter)
}

@Test
fun `test reset functionality`() {
val testData = Random.nextBytes(1000)

// Append block
val result = compressor.appendBlock(testData)
assertTrue(result.blockAppended)

// Reset compressor
compressor.reset()

// Verify state is cleared
val compressedData = compressor.getCompressedData()
assertEquals(0, compressedData.size)
}

@Test
fun `test draft isolation`() {
val block1 = Random.nextBytes(500)
val block2 = Random.nextBytes(dataLimit.toInt()) // Block that won't fit

// Append first block successfully
val result1 = compressor.appendBlock(block1)
assertTrue(result1.blockAppended)
val sizeAfterBlock1 = compressor.getCompressedData().size

// Try to append second block (should fail)
assertFalse(compressor.canAppendBlock(block2))

// Verify original data is preserved
val finalData = compressor.getCompressedData()
assertEquals(sizeAfterBlock1, finalData.size)
}

@Test
fun `test concurrent compression using draft pool`() {
val numThreads = 5
val blockSize = 500
val threads = List(numThreads) { threadId ->
thread {
repeat(3) {
val block = Random.nextBytes(blockSize)
assertTrue(compressor.canAppendBlock(block))
val result = compressor.appendBlock(block)
assertTrue(result.blockAppended)
}
}
}

// Wait for all threads to complete
threads.forEach { it.join() }

// Verify final state
val finalData = compressor.getCompressedData()
assertTrue(finalData.isNotEmpty())
}

@Test
fun `test draft pool reuse`() {
val blocks = List(10) { Random.nextBytes(500) }

// Process blocks sequentially to test pool reuse
blocks.forEach { block ->
assertTrue(compressor.canAppendBlock(block))
val result = compressor.appendBlock(block)
assertTrue(result.blockAppended)
compressor.startNewBatch()
}

// Verify final state
val finalData = compressor.getCompressedData()
assertTrue(finalData.isNotEmpty())
}
}