Skip to content

Commit

Permalink
Extend MetricsFacade with createSimpleTimer() factory (#265)
Browse files Browse the repository at this point in the history
* Extend MetricsFacade with createSimpleTimer() factory

* Rename Timer interface to TimerCapture

* Use createSimpleTimer to create SimpleTimerCaptures throughout codebase

* Fix test

* Pass metrics facade

* feat: add createDynamicTagTimer and update MetricsFacade codes

* feat: update test cases in MicrometerMetricsFacadeTest

* feat: update JsonRpcMessageProcessor to not for computing metrics parsing error

* feat: add timer metric for calculating shnarf

* feat: add metric for block compression ratio and blob utilization ratio

* feat: revert back to internal val goNativeBlobCompressor

* fix: spotless issue

* fix: JsonRpcMessageProcessor constructor call

* fix: VertxHttpJsonRpcClientFactory constructor call

* feat: added the option for ratio metric in createHistogram

* feat: remove no-daemon option in transaction-exclusion-api build

* feat: revised metrics handling in JsonRpcMessageProcessor

* feat: changed tag value on error

* feat: return error cases in handleMessage as succeededFuture instead of failedFuture

* fix: use MicrometerMetricsFacade in VertxHttpJsonRpcClientFactory

---------

Co-authored-by: jonesho <[email protected]>
  • Loading branch information
bradbown and jonesho authored Jan 20, 2025
1 parent db31170 commit 734a2fd
Show file tree
Hide file tree
Showing 51 changed files with 578 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
uses: gradle/actions/setup-gradle@v4
- name: Build dist
run: |
./gradlew transaction-exclusion-api:app:installDist --no-daemon
./gradlew transaction-exclusion-api:app:installDist
- name: Login to Docker Hub
if: ${{ env.DOCKERHUB_USERNAME != '' && env.DOCKERHUB_TOKEN != '' }}
uses: docker/login-action@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
Vertx.vertx(vertxConfig)
}
private val meterRegistry: MeterRegistry = BackendRegistries.getDefaultNow()
private val micrometerMetricsFacade = MicrometerMetricsFacade(meterRegistry, "linea")
private val httpJsonRpcClientFactory = VertxHttpJsonRpcClientFactory(
vertx,
meterRegistry,
vertx = vertx,
metricsFacade = MicrometerMetricsFacade(meterRegistry),
requestResponseLogLevel = Level.TRACE,
failuresLogLevel = Level.WARN
)
Expand Down Expand Up @@ -111,8 +112,6 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
)
)

private val micrometerMetricsFacade = MicrometerMetricsFacade(meterRegistry, "linea")

private val l1FeeHistoriesRepository =
FeeHistoriesRepositoryImpl(
FeeHistoriesRepositoryImpl.Config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ class L1DependentApp(
val compressorVersion = configs.traces.blobCompressorVersion
val blobCompressor = GoBackedBlobCompressor.getInstance(
compressorVersion = compressorVersion,
dataLimit = configs.blobCompression.blobSizeLimit.toUInt()
dataLimit = configs.blobCompression.blobSizeLimit.toUInt(),
metricsFacade = metricsFacade
)

val compressedBlobCalculator = ConflationCalculatorByDataCompressed(
Expand Down Expand Up @@ -475,7 +476,10 @@ class L1DependentApp(
blobsRepository = blobsRepository,
blobCompressionProverClient = proverClientFactory.blobCompressionProverClient(),
rollingBlobShnarfCalculator = RollingBlobShnarfCalculator(
blobShnarfCalculator = GoBackedBlobShnarfCalculator(blobShnarfCalculatorVersion),
blobShnarfCalculator = GoBackedBlobShnarfCalculator(
version = blobShnarfCalculatorVersion,
metricsFacade = metricsFacade
),
blobsRepository = blobsRepository,
genesisShnarf = genesisStateProvider.shnarf
),
Expand Down Expand Up @@ -518,7 +522,7 @@ class L1DependentApp(
private val alreadySubmittedBlobsFilter =
L1ShnarfBasedAlreadySubmittedBlobsFilter(
lineaRollup = lineaSmartContractClientForDataSubmission,
acceptedBlobEndBlockNumberConsumer = { highestAcceptedBlobTracker }
acceptedBlobEndBlockNumberConsumer = { highestAcceptedBlobTracker(it) }
)

private val latestBlobSubmittedBlockNumberTracker = LatestBlobSubmittedBlockNumberTracker(0UL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import net.consensys.linea.BlockNumberAndHash
import net.consensys.linea.async.get
import net.consensys.linea.jsonrpc.client.JsonRpcClient
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import org.assertj.core.api.Assertions
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
Expand All @@ -35,9 +37,9 @@ class ShomeiClientTest {
wiremock = WireMockServer(WireMockConfiguration.options().dynamicPort())
wiremock.start()
meterRegistry = SimpleMeterRegistry()

val metricsFacade: MetricsFacade = MicrometerMetricsFacade(registry = meterRegistry, "linea")
val rpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, metricsFacade)
fakeShomeiServerURI = URI("http://127.0.0.1:" + wiremock.port()).toURL()
val rpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, meterRegistry)
jsonRpcClient = rpcClientFactory.create(fakeShomeiServerURI)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import net.consensys.linea.errors.ErrorResponse
import net.consensys.linea.jsonrpc.client.JsonRpcClient
import net.consensys.linea.jsonrpc.client.RequestRetryConfig
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.linea.traces.TracesCountersV1
import net.consensys.linea.traces.TracingModuleV1
import org.assertj.core.api.Assertions.assertThat
Expand Down Expand Up @@ -71,7 +73,8 @@ class TracesGeneratorJsonRpcClientV1Test {

fakeTracesServerUri = URI("http://127.0.0.1:" + wiremock.port()).toURL()
meterRegistry = SimpleMeterRegistry()
val rpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, meterRegistry)
val metricsFacade: MetricsFacade = MicrometerMetricsFacade(registry = meterRegistry, "linea")
val rpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, metricsFacade)
vertxHttpJsonRpcClient = rpcClientFactory.createWithRetries(
fakeTracesServerUri,
methodsToRetry = TracesGeneratorJsonRpcClientV1.retryableMethods,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import net.consensys.linea.errors.ErrorResponse
import net.consensys.linea.jsonrpc.client.JsonRpcClient
import net.consensys.linea.jsonrpc.client.RequestRetryConfig
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.linea.traces.TracesCountersV2
import net.consensys.linea.traces.TracingModuleV2
import org.assertj.core.api.Assertions.assertThat
Expand Down Expand Up @@ -66,7 +68,8 @@ class TracesGeneratorJsonRpcClientV2Test {

fakeTracesServerUri = URI("http://127.0.0.1:" + wiremock.port()).toURL()
meterRegistry = SimpleMeterRegistry()
val rpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, meterRegistry)
val metricsFacade: MetricsFacade = MicrometerMetricsFacade(registry = meterRegistry, "linea")
val rpcClientFactory = VertxHttpJsonRpcClientFactory(vertx, metricsFacade)
vertxHttpJsonRpcClient = rpcClientFactory.createWithRetries(
fakeTracesServerUri,
methodsToRetry = TracesGeneratorJsonRpcClientV2.retryableMethods,
Expand Down
1 change: 1 addition & 0 deletions coordinator/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
api project(':jvm-libs:generic:extensions:futures')
api "tech.pegasys.teku.internal:unsigned:${libs.versions.teku.get()}"
api "org.jetbrains.kotlinx:kotlinx-datetime:${libs.versions.kotlinxDatetime.get()}"
implementation project(":jvm-libs:linea:metrics:micrometer")
implementation "io.vertx:vertx-core"
// jackson shall never be used in the core module
// however, it is used already :( but was as transitive through Teku Execution Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ class BlobCompressionProofCoordinator(
private var timerId: Long? = null
private lateinit var blobPollingAction: Handler<Long>
private val blobsCounter = metricsFacade.createCounter(
LineaMetricsCategory.BLOB,
"counter",
"New blobs arriving to blob compression proof coordinator"
category = LineaMetricsCategory.BLOB,
name = "counter",
description = "New blobs arriving to blob compression proof coordinator"
)

init {
metricsFacade.createGauge(
LineaMetricsCategory.BLOB,
"compression.queue.size",
"Size of blob compression proving queue",
{ blobsToHandle.size }
category = LineaMetricsCategory.BLOB,
name = "compression.queue.size",
description = "Size of blob compression proving queue",
measurementSupplier = { blobsToHandle.size }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import net.consensys.encodeHex
import net.consensys.linea.blob.BlobCompressorVersion
import net.consensys.linea.blob.GoNativeBlobCompressor
import net.consensys.linea.blob.GoNativeBlobCompressorFactory
import net.consensys.linea.metrics.LineaMetricsCategory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.TimerCapture
import org.apache.logging.log4j.LogManager
import kotlin.random.Random

Expand Down Expand Up @@ -35,7 +38,9 @@ interface BlobCompressor {
}

class GoBackedBlobCompressor private constructor(
internal val goNativeBlobCompressor: GoNativeBlobCompressor
internal val goNativeBlobCompressor: GoNativeBlobCompressor,
private val dataLimit: UInt,
private val metricsFacade: MetricsFacade
) : BlobCompressor {

companion object {
Expand All @@ -44,7 +49,8 @@ class GoBackedBlobCompressor private constructor(

fun getInstance(
compressorVersion: BlobCompressorVersion = BlobCompressorVersion.V0_1_0,
dataLimit: UInt
dataLimit: UInt,
metricsFacade: MetricsFacade
): GoBackedBlobCompressor {
if (instance == null) {
synchronized(this) {
Expand All @@ -57,7 +63,7 @@ class GoBackedBlobCompressor private constructor(
if (!initialized) {
throw InstantiationException(goNativeBlobCompressor.Error())
}
instance = GoBackedBlobCompressor(goNativeBlobCompressor)
instance = GoBackedBlobCompressor(goNativeBlobCompressor, dataLimit, metricsFacade)
} else {
throw IllegalStateException("Compressor singleton instance already created")
}
Expand All @@ -69,22 +75,52 @@ class GoBackedBlobCompressor private constructor(
}
}

private val canAppendBlockTimer: TimerCapture<Boolean> = metricsFacade.createSimpleTimer(
category = LineaMetricsCategory.BLOB,
name = "compressor.canappendblock",
description = "Time taken to check if block fits in current blob"
)
private val appendBlockTimer: TimerCapture<Boolean> = metricsFacade.createSimpleTimer(
category = LineaMetricsCategory.BLOB,
name = "compressor.appendblock",
description = "Time taken to compress block into current blob"
)
private val compressionRatioHistogram = metricsFacade.createHistogram(
category = LineaMetricsCategory.BLOB,
name = "block.compression.ratio",
description = "Block compression ratio measured in [0.0,1.0]",
isRatio = true
)
private val utilizationRatioHistogram = metricsFacade.createHistogram(
category = LineaMetricsCategory.BLOB,
name = "data.utilization.ratio",
description = "Data utilization ratio of a blob measured in [0.0,1.0]",
isRatio = true
)

private val log = LogManager.getLogger(GoBackedBlobCompressor::class.java)

override fun canAppendBlock(blockRLPEncoded: ByteArray): Boolean {
return goNativeBlobCompressor.CanWrite(blockRLPEncoded, blockRLPEncoded.size)
return canAppendBlockTimer.captureTime {
goNativeBlobCompressor.CanWrite(blockRLPEncoded, blockRLPEncoded.size)
}
}

override fun appendBlock(blockRLPEncoded: ByteArray): BlobCompressor.AppendResult {
val compressionSizeBefore = goNativeBlobCompressor.Len()
val appended = goNativeBlobCompressor.Write(blockRLPEncoded, blockRLPEncoded.size)
val appended = appendBlockTimer.captureTime {
goNativeBlobCompressor.Write(blockRLPEncoded, blockRLPEncoded.size)
}
val compressedSizeAfter = goNativeBlobCompressor.Len()
val compressionRatio = (1.0 - (compressedSizeAfter - compressionSizeBefore).toDouble() / blockRLPEncoded.size)
.also { compressionRatioHistogram.record(it) }

log.trace(
"block compressed: blockRlpSize={} compressionDataBefore={} compressionDataAfter={} compressionRatio={}",
blockRLPEncoded.size,
compressionSizeBefore,
compressedSizeAfter,
1.0 - ((compressedSizeAfter - compressionSizeBefore).toDouble() / blockRLPEncoded.size)
compressionRatio
)
val error = goNativeBlobCompressor.Error()
if (error != null) {
Expand All @@ -101,6 +137,7 @@ class GoBackedBlobCompressor private constructor(
override fun getCompressedData(): ByteArray {
val compressedData = ByteArray(goNativeBlobCompressor.Len())
goNativeBlobCompressor.Bytes(compressedData)
utilizationRatioHistogram.record(goNativeBlobCompressor.Len().toDouble() / dataLimit.toInt())
return compressedData
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package net.consensys.zkevm.ethereum.coordination.blob
import build.linea.domain.BlockIntervals
import net.consensys.decodeHex
import net.consensys.encodeHex
import net.consensys.linea.blob.CalculateShnarfResult
import net.consensys.linea.blob.GoNativeBlobShnarfCalculator
import net.consensys.linea.blob.GoNativeShnarfCalculatorFactory
import net.consensys.linea.blob.ShnarfCalculatorVersion
import net.consensys.linea.metrics.LineaMetricsCategory
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.TimerCapture
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.Base64
Expand Down Expand Up @@ -66,9 +70,17 @@ interface BlobShnarfCalculator {
}

class GoBackedBlobShnarfCalculator(
private val delegate: GoNativeBlobShnarfCalculator
private val delegate: GoNativeBlobShnarfCalculator,
private val metricsFacade: MetricsFacade
) : BlobShnarfCalculator {
constructor(version: ShnarfCalculatorVersion) : this(GoNativeShnarfCalculatorFactory.getInstance(version))
constructor(version: ShnarfCalculatorVersion, metricsFacade: MetricsFacade) :
this(GoNativeShnarfCalculatorFactory.getInstance(version), metricsFacade)

private val calculateShnarfTimer: TimerCapture<CalculateShnarfResult> = metricsFacade.createSimpleTimer(
category = LineaMetricsCategory.BLOB,
name = "shnarf.calculation",
description = "Time taken to calculate the shnarf hash of the given blob"
)

private val log: Logger = LogManager.getLogger(GoBackedBlobShnarfCalculator::class.java)

Expand Down Expand Up @@ -97,16 +109,18 @@ class GoBackedBlobShnarfCalculator(
conflationOrder
)

val result = delegate.CalculateShnarf(
eip4844Enabled = true,
compressedData = compressedDataB64,
parentStateRootHash = parentStateRootHash.encodeHex(),
finalStateRootHash = finalStateRootHash.encodeHex(),
prevShnarf = prevShnarf.encodeHex(),
conflationOrderStartingBlockNumber = conflationOrder.startingBlockNumber.toLong(),
conflationOrderUpperBoundariesLen = conflationOrder.upperBoundaries.size,
conflationOrderUpperBoundaries = conflationOrder.upperBoundaries.map { it.toLong() }.toLongArray()
)
val result = calculateShnarfTimer.captureTime {
delegate.CalculateShnarf(
eip4844Enabled = true,
compressedData = compressedDataB64,
parentStateRootHash = parentStateRootHash.encodeHex(),
finalStateRootHash = finalStateRootHash.encodeHex(),
prevShnarf = prevShnarf.encodeHex(),
conflationOrderStartingBlockNumber = conflationOrder.startingBlockNumber.toLong(),
conflationOrderUpperBoundariesLen = conflationOrder.upperBoundaries.size,
conflationOrderUpperBoundaries = conflationOrder.upperBoundaries.map { it.toLong() }.toLongArray()
)
}

if (result.errorMessage.isNotEmpty()) {
val errorMessage = "Error while calculating Shnarf. error=${result.errorMessage}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class ConflationServiceImpl(
internal val blocksToConflate = PriorityBlockingQueue<PayloadAndBlockCounters>()

private val blocksCounter = metricsFacade.createCounter(
LineaMetricsCategory.CONFLATION,
"blocks.imported",
"New blocks arriving to conflation service counter"
category = LineaMetricsCategory.CONFLATION,
name = "blocks.imported",
description = "New blocks arriving to conflation service counter"
)

init {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package net.consensys.zkevm.coordination.blob

import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import net.consensys.linea.blob.BlobCompressorVersion
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.zkevm.ethereum.coordination.blob.BlobCompressionException
import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobCompressor
import org.assertj.core.api.Assertions.assertThat
Expand All @@ -17,7 +20,13 @@ class GoBackedBlobCompressorTest {
companion object {
private const val DATA_LIMIT = 16 * 1024
private val TEST_DATA = loadTestData()
private val compressor = GoBackedBlobCompressor.getInstance(BlobCompressorVersion.V0_1_0, DATA_LIMIT.toUInt())
private val meterRegistry = SimpleMeterRegistry()
private val metricsFacade: MetricsFacade = MicrometerMetricsFacade(registry = meterRegistry, "linea")
private val compressor = GoBackedBlobCompressor.getInstance(
BlobCompressorVersion.V0_1_0,
DATA_LIMIT.toUInt(),
metricsFacade
)
private fun loadTestData(): Array<ByteArray> {
val data = GoBackedBlobCompressorTest::class.java.getResourceAsStream("rlp_blocks.bin")!!.readAllBytes()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package net.consensys.zkevm.coordination.blob

import build.linea.domain.BlockIntervals
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import net.consensys.decodeHex
import net.consensys.encodeHex
import net.consensys.linea.blob.CalculateShnarfResult
import net.consensys.linea.blob.GoNativeBlobShnarfCalculator
import net.consensys.linea.metrics.MetricsFacade
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.zkevm.ethereum.coordination.blob.GoBackedBlobShnarfCalculator
import net.consensys.zkevm.ethereum.coordination.blob.ShnarfResult
import org.apache.tuweni.bytes.Bytes32
Expand All @@ -22,6 +25,8 @@ import org.mockito.kotlin.whenever
class GoBackedCalculateShnarfCalculatorTest {
private lateinit var delegate: GoNativeBlobShnarfCalculator
private lateinit var calculator: GoBackedBlobShnarfCalculator
private val meterRegistry = SimpleMeterRegistry()
private val metricsFacade: MetricsFacade = MicrometerMetricsFacade(registry = meterRegistry, "linea")
private val compressedData = byteArrayOf(0b01001101, 0b01100001, 0b01101110)
private val compressedDataBase64String = "TWFu"
private val parentStateRootHash = Bytes32.random().toArray()
Expand Down Expand Up @@ -52,7 +57,7 @@ class GoBackedCalculateShnarfCalculatorTest {
@BeforeEach
fun beforeEach() {
delegate = mock()
calculator = GoBackedBlobShnarfCalculator(delegate)
calculator = GoBackedBlobShnarfCalculator(delegate, metricsFacade)
}

@Test
Expand Down
Loading

0 comments on commit 734a2fd

Please sign in to comment.