diff --git a/CHANGELOG.md b/CHANGELOG.md index 06eb644f1..25dd586c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ Lists all changes with user impact. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). +## [0.22.2] +### Changed +- Migrated metrics to prometheus ## [0.22.1] ### Changed diff --git a/build.gradle b/build.gradle index dd88f1c24..3db0cfda2 100644 --- a/build.gradle +++ b/build.gradle @@ -55,7 +55,7 @@ allprojects { bytebuddy : '1.15.1', re2j : '1.3', xxhash : '0.10.1', - dropwizard : '4.2.26' + dropwizard : '4.2.26', ] dependencyManagement { diff --git a/docs/deployment/observability.md b/docs/deployment/observability.md index ebce6bb30..054ef5b83 100644 --- a/docs/deployment/observability.md +++ b/docs/deployment/observability.md @@ -5,13 +5,17 @@ Envoy Control uses [SLF4J](https://www.slf4j.org/) with [Logback](https://logback.qos.ch/) for logging. To override the default settings, point a file via environment variable + ```bash export ENVOY_CONTROL_RUNNER_OPTS="-Dlogging.config=/path/to/logback/logback.xml" ``` + and then run the `bin/envoy-control-runner` created from `distZip` task. `java-control-plane` produces quite a lot of logging on `INFO` level. Consider switching it to `WARN` + ```xml + ``` @@ -25,13 +29,12 @@ Sample logger configuration is available here. ### Envoy Control -Metric | Description ------------------------------| ----------------------------------- -**services.added** | Counter of added services events -**services.removed** | Counter of removed services events -**services.instanceChanged** | Counter of instance change events +Metric | Description | Labels +----------------------|------------------------------------|-------------------------------- +**watch** | Counter of watched services events | status (added/removed/instances-changed/snapshot-changed), watch-type, metric-emitter -Standard [Spring metrics](https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html#production-ready-metrics-meter) (JVM, CPU, HTTP server) are also included. +Standard [Spring metrics](https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html#production-ready-metrics-meter) ( +JVM, CPU, HTTP server) are also included. ### Envoy Control Runner @@ -39,41 +42,24 @@ Envoy Control Runner exposes a set of metrics on standard Spring Actuator's `/ac #### xDS connections -Metric | Description ------------------------------| -------------------------------------------------------- -**grpc.connections.ads** | Number of running gRPC ADS connections -**grpc.connections.cds** | Number of running gRPC CDS connections -**grpc.connections.eds** | Number of running gRPC EDS connections -**grpc.connections.lds** | Number of running gRPC LDS connections -**grpc.connections.rds** | Number of running gRPC RDS connections -**grpc.connections.sds** | Number of running gRPC SDS connections -**grpc.connections.unknown** | Number of running gRPC connections for unknown resource + Metric | Description | Labels +----------------------|----------------------------------------------------|------------------------------------ + **connections** | Number of running gRPC connections of a given type | stream-type (cds/xds/lds/rds/sds/unknown), connection-type (grpc) #### xDS requests -Metric | Description -------------------------------- | -------------------------------------------------------- -**grpc.requests.cds** | Counter of received gRPC CDS requests -**grpc.requests.eds** | Counter of received gRPC EDS requests -**grpc.requests.lds** | Counter of received gRPC LDS requests -**grpc.requests.rds** | Counter of received gRPC RDS requests -**grpc.requests.sds** | Counter of received gRPC SDS requests -**grpc.requests.unknown** | Counter of received gRPC requests for unknown resource -**grpc.requests.cds.delta** | Counter of received gRPC delta CDS requests -**grpc.requests.eds.delta** | Counter of received gRPC delta EDS requests -**grpc.requests.lds.delta** | Counter of received gRPC delta LDS requests -**grpc.requests.rds.delta** | Counter of received gRPC delta RDS requests -**grpc.requests.sds.delta** | Counter of received gRPC delta SDS requests -**grpc.requests.unknown.delta** | Counter of received gRPC delta requests for unknown resource + Metric | Description | Labels +-------------------------|---------------------------------------------------|-------------------------------------------------------------- + **requests.total** | Counter of received gRPC requests of a given type | stream-type (cds/xds/lds/rds/sds/unknown), connection-type (grpc), discovery-request-type(total/delta) #### Snapshot -Metric | Description --------------------------| ---------------------------------- -**cache.groupCount** | Number of unique groups in SnapshotCache + Metric | Description | Labels +------------------------|------------------------------------------|-------- + **cache.groups.count** | Number of unique groups in SnapshotCache | - #### Synchronization -Metric | Description -----------------------------------------| ------------------------------------------------- -**cross-dc-synchronization.$dc.errors** | Counter of synchronization errors for given DC + Metric | Description | Labels +-------------------------------------------|----------------------------------------------------------------|---------------------------------------------- + **errors.total** | Counter of synchronization errors for a given DC and operation | cluster, operation (get-instances/get-state) diff --git a/envoy-control-core/build.gradle b/envoy-control-core/build.gradle index 6d4295acf..d430d0b56 100644 --- a/envoy-control-core/build.gradle +++ b/envoy-control-core/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect' api group: 'io.dropwizard.metrics', name: 'metrics-core', version: versions.dropwizard api group: 'io.micrometer', name: 'micrometer-core' + implementation group: 'com.google.re2j', name: 're2j', version: versions.re2j api group: 'io.envoyproxy.controlplane', name: 'server', version: versions.java_controlplane diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index d03592962..769f992d7 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.server.callback.SnapshotCollectingCallback import io.grpc.Server import io.grpc.netty.NettyServerBuilder import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioServerSocketChannel @@ -221,10 +222,12 @@ class ControlPlane private constructor( nioEventLoopExecutor ) ) - .bossEventLoopGroup(NioEventLoopGroup( - properties.server.nioBossEventLoopThreadCount, - nioBossEventLoopExecutor - )) + .bossEventLoopGroup( + NioEventLoopGroup( + properties.server.nioBossEventLoopThreadCount, + nioBossEventLoopExecutor + ) + ) .channelType(NioServerSocketChannel::class.java) .executor(grpcServerExecutor) .keepAliveTime(properties.server.netty.keepAliveTime.toMillis(), TimeUnit.MILLISECONDS) @@ -410,7 +413,11 @@ class ControlPlane private constructor( } private fun meterExecutor(executor: ExecutorService, executorServiceName: String) { - ExecutorServiceMetrics(executor, executorServiceName, executorServiceName, emptySet()) + ExecutorServiceMetrics( + executor, + executorServiceName, + Tags.of("executor", executorServiceName) + ) .bindTo(meterRegistry) } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 522d96eae..56cd44685 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -11,6 +11,8 @@ import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.util.function.Consumer @@ -34,9 +36,14 @@ internal class GroupChangeWatcher( fun onGroupAdded(): Flux> { return groupsChanged - .measureBuffer("group-change-watcher-emitted", meterRegistry) + .measureBuffer("group-change-watcher", meterRegistry) .checkpoint("group-change-watcher-emitted") - .name("group-change-watcher-emitted").metrics() + .name(REACTOR_METRIC) + .tag(WATCH_TYPE_TAG, "group") + .metrics() + .doOnSubscribe { + logger.info("Watching group changes") + } .doOnCancel { logger.warn("Cancelling watching group changes") } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt index 3503b70a4..2ce63e1ee 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt @@ -12,6 +12,7 @@ import java.util.function.Supplier import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer import io.micrometer.core.instrument.Timer +import pl.allegro.tech.servicemesh.envoycontrol.utils.PROTOBUF_CACHE_METRIC internal class CachedProtoResourcesSerializer( private val meterRegistry: MeterRegistry, @@ -27,7 +28,7 @@ internal class CachedProtoResourcesSerializer( } private val cache: Cache = createCache("protobuf-cache") - private val timer = createTimer(reportMetrics, meterRegistry, "protobuf-cache.serialize.time") + private val timer = createTimer(reportMetrics, meterRegistry, PROTOBUF_CACHE_METRIC) private fun createCache(cacheName: String): Cache { return if (reportMetrics) { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt index c0f65410b..4121df56a 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt @@ -5,6 +5,12 @@ import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG import java.util.concurrent.atomic.AtomicInteger class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks { @@ -34,9 +40,12 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) .map { type -> type to AtomicInteger(0) } .toMap() - meterRegistry.gauge("grpc.all-connections", connections) connectionsByType.forEach { (type, typeConnections) -> - meterRegistry.gauge("grpc.connections.${type.name.toLowerCase()}", typeConnections) + meterRegistry.gauge( + CONNECTIONS_METRIC, + Tags.of(CONNECTION_TYPE_TAG, "grpc", STREAM_TYPE_TAG, type.name.lowercase()), + typeConnections + ) } } @@ -51,7 +60,14 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) } override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) { - meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}") + meterRegistry.counter( + REQUESTS_METRIC, + Tags.of( + CONNECTION_TYPE_TAG, "grpc", + STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), + DISCOVERY_REQ_TYPE_TAG, "total" + ) + ) .increment() } @@ -59,7 +75,14 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) streamId: Long, request: V3DeltaDiscoveryRequest ) { - meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}.delta") + meterRegistry.counter( + REQUESTS_METRIC, + Tags.of( + CONNECTION_TYPE_TAG, "grpc", + STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), + DISCOVERY_REQ_TYPE_TAG, "delta" + ) + ) .increment() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt index c56165dad..4e80857d7 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt @@ -7,6 +7,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.Timer import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode @@ -24,6 +25,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.Envo import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.EnvoyListenersFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEgressRoutesFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIngressRoutesFactory +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_FACTORY_SECONDS_METRIC import java.util.SortedMap class EnvoySnapshotFactory( @@ -67,7 +69,12 @@ class EnvoySnapshotFactory( endpoints = endpoints, properties = properties.outgoingPermissions ) - sample.stop(meterRegistry.timer("snapshot-factory.new-snapshot.time")) + sample.stop( + meterRegistry.timer( + SNAPSHOT_FACTORY_SECONDS_METRIC, + Tags.of("operation", "new-snapshot", "type", "global") + ) + ) return snapshot } @@ -155,7 +162,12 @@ class EnvoySnapshotFactory( val groupSample = Timer.start(meterRegistry) val newSnapshotForGroup = newSnapshotForGroup(group, globalSnapshot) - groupSample.stop(meterRegistry.timer("snapshot-factory.get-snapshot-for-group.time")) + groupSample.stop( + meterRegistry.timer( + SNAPSHOT_FACTORY_SECONDS_METRIC, + Tags.of("operation", "new-snapshot", "type", "group") + ) + ) return newSnapshotForGroup } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 3baea120d..2ca421461 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -3,13 +3,23 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot import io.envoyproxy.controlplane.cache.SnapshotCache import io.envoyproxy.controlplane.cache.v3.Snapshot import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.Timer import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState +import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer @@ -50,9 +60,13 @@ class SnapshotUpdater( // step 2: only watches groups. if groups change we use the last services state and update those groups groups().subscribeOn(globalSnapshotScheduler) ) - .measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2) + .measureBuffer("snapshot-updater", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name("snapshot-updater-merged").metrics() + .name(REACTOR_METRIC) + .tag(METRIC_EMITTER_TAG, "snapshot-updater") + .tag(SNAPSHOT_STATUS_TAG, "merged") + .tag(UPDATE_TRIGGER_TAG, "global") + .metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -87,14 +101,22 @@ class SnapshotUpdater( // see GroupChangeWatcher return onGroupAdded .publishOn(globalSnapshotScheduler) - .measureBuffer("snapshot-updater-groups-published", meterRegistry) + .measureBuffer("snapshot-updater", meterRegistry) .checkpoint("snapshot-updater-groups-published") - .name("snapshot-updater-groups-published").metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } + .name(REACTOR_METRIC) + .tag(METRIC_EMITTER_TAG, "snapshot-updater") + .tag(SNAPSHOT_STATUS_TAG, "published") + .tag(UPDATE_TRIGGER_TAG, "groups") + .metrics() .onErrorResume { e -> - meterRegistry.counter("snapshot-updater.groups.updates.errors").increment() + meterRegistry.counter( + ERRORS_TOTAL_METRIC, + Tags.of(UPDATE_TRIGGER_TAG, "groups", METRIC_EMITTER_TAG, "snapshot-updater") + ) + .increment() logger.error("Unable to process new group", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) } @@ -102,13 +124,18 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name("snapshot-updater-services-sampled").metrics() - .onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry) + .name(REACTOR_METRIC) + .tag(UPDATE_TRIGGER_TAG, "services") + .tag(STATUS_TAG, "sampled") + .onBackpressureLatestMeasured("snapshot-updater", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) - .measureBuffer("snapshot-updater-services-published", meterRegistry) + .measureBuffer("snapshot-updater", meterRegistry) .checkpoint("snapshot-updater-services-published") - .name("snapshot-updater-services-published").metrics() + .name(REACTOR_METRIC) + .tag(UPDATE_TRIGGER_TAG, "services") + .tag(STATUS_TAG, "published") + .metrics() .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null @@ -135,14 +162,19 @@ class SnapshotUpdater( } .filter { it != emptyUpdateResult } .onErrorResume { e -> - meterRegistry.counter("snapshot-updater.services.updates.errors").increment() + meterRegistry.counter( + ERRORS_TOTAL_METRIC, + Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services") + ).increment() logger.error("Unable to process service changes", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) } } private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) { - meterRegistry.timer("snapshot-updater.set-snapshot.$serviceName.time") + meterRegistry.timer( + SIMPLE_CACHE_METRIC, Tags.of(SERVICE_TAG, serviceName, OPERATION_TAG, "set-snapshot") + ) } else { noopTimer } @@ -154,12 +186,19 @@ class SnapshotUpdater( cache.setSnapshot(group, groupSnapshot) } } catch (e: Throwable) { - meterRegistry.counter("snapshot-updater.services.${group.serviceName}.updates.errors").increment() + meterRegistry.counter( + ERRORS_TOTAL_METRIC, + Tags.of( + SERVICE_TAG, group.serviceName, + OPERATION_TAG, "create-snapshot", + METRIC_EMITTER_TAG, "snapshot-updater" + ) + ).increment() logger.error("Unable to create snapshot for group ${group.serviceName}", e) } } - private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot-updater.update-snapshot-for-groups.time") + private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds") private fun updateSnapshotForGroups( groups: Collection, @@ -174,10 +213,12 @@ class SnapshotUpdater( } else if (result.xdsSnapshot != null && group.communicationMode == XDS) { updateSnapshotForGroup(group, result.xdsSnapshot) } else { - meterRegistry.counter("snapshot-updater.communication-mode.errors").increment() - logger.error("Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + - "Handling Envoy with not supported communication mode should have been rejected before." + - " Please report this to EC developers.") + meterRegistry.counter(ERRORS_TOTAL_METRIC, Tags.of("type", "communication-mode")).increment() + logger.error( + "Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + + "Handling Envoy with not supported communication mode should have been rejected before." + + " Please report this to EC developers." + ) } } return results.then(Mono.fromCallable { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 4a99ad39a..912b6f57f 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -1,9 +1,12 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured @@ -15,9 +18,10 @@ class GlobalStateChanges( private val meterRegistry: MeterRegistry, private val properties: SyncProperties ) { - private val scheduler = Schedulers.newBoundedElastic( - Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator" - ) + private val scheduler = + Schedulers.newBoundedElastic( + Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator" + ) fun combined(): Flux { val clusterStatesStreams: List> = clusterStateChanges.map { it.stream() } @@ -41,9 +45,11 @@ class GlobalStateChanges( .toMultiClusterState() } .logSuppressedError("combineLatest() suppressed exception") - .measureBuffer("global-service-changes-combine-latest", meterRegistry) + .measureBuffer("global-service-changes-combinator", meterRegistry) .checkpoint("global-service-changes-emitted") - .name("global-service-changes-emitted").metrics() + .name(REACTOR_METRIC) + .tag(METRIC_EMITTER_TAG, "global-service-changes-combinator") + .metrics() } private fun combinedExperimentalFlow( @@ -70,10 +76,13 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combine-latest", meterRegistry) .checkpoint("global-service-changes-emitted") - .name("global-service-changes-emitted").metrics() + .name(REACTOR_METRIC) + .tag(METRIC_EMITTER_TAG, "global-service-changes") + .tag(CHECKPOINT_TAG, "emitted") .onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry) .publishOn(scheduler, 1) .checkpoint("global-service-changes-published") - .name("global-service-changes-published").metrics() + .tag(CHECKPOINT_TAG, "published") + .metrics() } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt index 6ef67a998..ee85877b8 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt @@ -3,6 +3,8 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import reactor.core.publisher.Flux class RemoteClusterStateChanges( @@ -14,5 +16,7 @@ class RemoteClusterStateChanges( .getChanges(properties.sync.pollingInterval) .startWith(MultiClusterState.empty()) .distinctUntilChanged() - .name("cross-dc-changes-distinct").metrics() + .name(REACTOR_METRIC) + .tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation") + .metrics() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt index 3782c7952..092e0a077 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt @@ -1,12 +1,20 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.Locality import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_CANCELLED_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_SECONDS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_TOTAL_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.lang.Integer.max @@ -29,13 +37,17 @@ class RemoteServices( fun getChanges(interval: Long): Flux { val aclFlux: Flux = Flux.create({ sink -> scheduler.scheduleWithFixedDelay({ - meterRegistry.timer("sync-dc.get-multi-cluster-states.time").recordCallable { - getChanges(sink::next, interval) - } + meterRegistry.timer( + CROSS_DC_SYNC_SECONDS_METRIC, + Tags.of(OPERATION_TAG, "get-multi-cluster-state") + ) + .recordCallable { + getChanges(sink::next, interval) + } }, 0, interval, TimeUnit.SECONDS) }, FluxSink.OverflowStrategy.LATEST) return aclFlux.doOnCancel { - meterRegistry.counter("cross-dc-synchronization.cancelled").increment() + meterRegistry.counter(CROSS_DC_SYNC_CANCELLED_METRIC).increment() logger.warn("Cancelling cross dc sync") } } @@ -59,7 +71,14 @@ class RemoteServices( .thenApply { servicesStateFromCluster(cluster, it) } .orTimeout(interval, TimeUnit.SECONDS) .exceptionally { - meterRegistry.counter("cross-dc-synchronization.$cluster.state-fetcher.errors").increment() + meterRegistry.counter( + ERRORS_TOTAL_METRIC, + Tags.of( + CLUSTER_TAG, cluster, + OPERATION_TAG, "get-state", + METRIC_EMITTER_TAG, "cross-dc-synchronization" + ) + ).increment() logger.warn("Error synchronizing instances ${it.message}", it) clusterStateCache[cluster] } @@ -70,7 +89,14 @@ class RemoteServices( val instances = controlPlaneInstanceFetcher.instances(cluster) cluster to instances } catch (e: Exception) { - meterRegistry.counter("cross-dc-synchronization.$cluster.instance-fetcher.errors").increment() + meterRegistry.counter( + ERRORS_TOTAL_METRIC, + Tags.of( + CLUSTER_TAG, cluster, + OPERATION_TAG, "get-instances", + METRIC_EMITTER_TAG, "cross-dc-synchronization" + ) + ).increment() logger.warn("Failed fetching instances from $cluster", e) cluster to emptyList() } @@ -80,7 +106,10 @@ class RemoteServices( cluster: String, state: ServicesState ): ClusterState { - meterRegistry.counter("cross-dc-service-update-$cluster").increment() + meterRegistry.counter( + CROSS_DC_SYNC_TOTAL_METRIC, Tags.of(CLUSTER_TAG, cluster) + ) + .increment() val clusterState = ClusterState( state.removeServicesWithoutInstances(), Locality.REMOTE, diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt index ce8f380d9..6a99f0fa0 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt @@ -5,3 +5,31 @@ import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.noop.NoopTimer val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER)) +const val REACTOR_METRIC = "reactor" +const val ERRORS_TOTAL_METRIC = "errors.total" +const val CONNECTIONS_METRIC = "connections" +const val REQUESTS_METRIC = "requests.total" +const val WATCH_METRIC = "watch" +const val ENVOY_CONTROL_WARM_UP_METRIC = "envoy.control.warmup.seconds" +const val CROSS_DC_SYNC_METRIC = "cross.dc.synchronization" +const val CROSS_DC_SYNC_CANCELLED_METRIC = "$CROSS_DC_SYNC_METRIC.cancelled.total" +const val CROSS_DC_SYNC_SECONDS_METRIC = "$CROSS_DC_SYNC_METRIC.seconds" +const val CROSS_DC_SYNC_TOTAL_METRIC = "$CROSS_DC_SYNC_METRIC.total" +const val SIMPLE_CACHE_METRIC = "simple.cache.duration.seconds" +const val PROTOBUF_CACHE_METRIC = "protobuf.cache.serialize.time" +const val CACHE_GROUP_COUNT_METRIC = "cache.groups.count" +const val SNAPSHOT_FACTORY_SECONDS_METRIC = "snapshot.factory.seconds" + +const val CONNECTION_TYPE_TAG = "connection-type" +const val STREAM_TYPE_TAG = "stream-type" +const val CHECKPOINT_TAG = "checkpoint" +const val WATCH_TYPE_TAG = "watch-type" +const val DISCOVERY_REQ_TYPE_TAG = "discovery-request-type" +const val METRIC_TYPE_TAG = "metric-type" +const val METRIC_EMITTER_TAG = "metric-emitter" +const val SNAPSHOT_STATUS_TAG = "snapshot-status" +const val UPDATE_TRIGGER_TAG = "update-trigger" +const val SERVICE_TAG = "service" +const val OPERATION_TAG = "operation" +const val CLUSTER_TAG = "cluster" +const val STATUS_TAG = "status" diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt index 00fccdc2d..ab4806a09 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.reactivestreams.Subscription import org.slf4j.LoggerFactory import reactor.core.Disposable @@ -49,7 +50,13 @@ fun Flux.measureBuffer( * operator and calculate difference between them */ fun Flux.measureDiscardedItems(name: String, meterRegistry: MeterRegistry): Flux = this - .doOnDiscard(Any::class.java) { meterRegistry.counter("reactor-discarded-items.$name").increment() } + .doOnDiscard(Any::class.java) { + meterRegistry.counter( + REACTOR_METRIC, + METRIC_TYPE_TAG, "discarded-items", + METRIC_EMITTER_TAG, name + ).increment() + } fun Flux.onBackpressureLatestMeasured(name: String, meterRegistry: MeterRegistry): Flux = measureDiscardedItems("$name-before", meterRegistry) @@ -105,7 +112,12 @@ private fun measureQueueSubscriptionBuffer( name: String, meterRegistry: MeterRegistry ) { - meterRegistry.gauge(bufferMetric(name), subscription, queueSubscriptionBufferExtractor) + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name), + subscription, + queueSubscriptionBufferExtractor + ) } private fun measureScannableBuffer( @@ -116,12 +128,19 @@ private fun measureScannableBuffer( ) { val buffered = scannable.scan(Scannable.Attr.BUFFERED) if (buffered == null) { - logger.error("Cannot register metric '${bufferMetric(name)}'. Buffer size not available. " + - "Use measureBuffer() only on supported reactor operators") + logger.error( + "Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " + + "Use measureBuffer() only on supported reactor operators" + ) return } - meterRegistry.gauge(bufferMetric(name), scannable, scannableBufferExtractor) + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name), + scannable, + scannableBufferExtractor + ) /** * Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual @@ -131,7 +150,12 @@ private fun measureScannableBuffer( * be available, so it must be stated explicitly as innerSources parameter. */ for (i in 0 until innerSources) { - meterRegistry.gauge("${bufferMetric(name)}_$i", scannable, innerBufferExtractor(i)) + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"), + scannable, + innerBufferExtractor(i) + ) } } @@ -142,9 +166,10 @@ private fun innerBufferExtractor(index: Int) = { s: Scannable -> ?.let(scannableBufferExtractor) ?: -1.0 } -private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> -> s.size.toDouble() } -private fun bufferMetric(name: String) = "reactor-buffers.$name" +private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> -> + s.size.toDouble() +} sealed class ParallelizableScheduler object DirectScheduler : ParallelizableScheduler() @@ -160,6 +185,7 @@ fun Flux.doOnNextScheduledOn( is DirectScheduler -> { doOnNext(doOnNext) } + is ParallelScheduler -> { this.parallel(scheduler.parallelism) .runOn(scheduler.scheduler) diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt index 2a91824e1..bea774bcc 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt @@ -1,5 +1,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.metrics +import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test @@ -25,13 +26,25 @@ class ThreadPoolMetricTest { controlPlane.start() // then - val allMeterNames = meterRegistry.meters.map { it.id.name } - val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap { - listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size") + val metricNames = listOf("executor.completed", "executor.active", "executor.queued", "executor.pool.size") + + val executorNames = listOf( + "grpc-server-worker", + "grpc-worker-event-loop", + "snapshot-update", + "group-snapshot" + ).associateWith { metricNames } + + assertThat(executorNames.entries).allSatisfy { + assertThat(it.value.all { metricName -> + meterRegistry.meters.any { meter -> + meter.id.name == metricName && meter.id.tags.contains( + Tag.of("executor", it.key) + ) + } + }).isTrue() } - assertThat(allMeterNames).containsAll(requiredMeterNames) - // and controlPlane.close() } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index 0cba50150..6c0a14939 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -16,6 +16,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener import io.envoyproxy.envoy.config.route.v3.RetryPolicy import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test @@ -59,6 +60,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.DirectScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.any +import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers @@ -468,7 +473,8 @@ class SnapshotUpdaterTest { val snapshot = cache.getSnapshot(servicesGroup) assertThat(snapshot).isEqualTo(null) assertThat( - simpleMeterRegistry.find("snapshot-updater.services.example-service.updates.errors") + simpleMeterRegistry.find(ERRORS_TOTAL_METRIC) + .tags(Tags.of(SERVICE_TAG, "example-service", OPERATION_TAG, "create-snapshot", METRIC_EMITTER_TAG, "snapshot-updater")) .counter()?.count() ).isEqualTo(1.0) } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt index ab0830e56..c2f43c8f0 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt @@ -1,9 +1,11 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.fail +import org.testcontainers.shaded.org.awaitility.Awaitility import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers import java.util.concurrent.CountDownLatch @@ -24,10 +26,15 @@ class ReactorUtilsTest { .subscribeRequestingN(n = 5) // then - assertThat(received.await(2, TimeUnit.SECONDS)).isTrue() - - val buffered = meterRegistry["reactor-buffers.publish"].gauge() - assertThat(buffered.value()).isEqualTo(15.0) + assertThat(received.await(5, TimeUnit.SECONDS)).isTrue() + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted { + assertThat( + meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "publish")) + .gauge()?.value() + ).isEqualTo(15.0) + } } @Test @@ -43,9 +50,12 @@ class ReactorUtilsTest { // then assertThat(received.await(2, TimeUnit.SECONDS)).isTrue() - val sourcesCount = meterRegistry["reactor-buffers.merge"].gauge().value() - val source0Buffered = meterRegistry["reactor-buffers.merge_0"].gauge().value() - val source1Buffered = meterRegistry["reactor-buffers.merge_1"].gauge().value() + val sourcesCount = meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "merge")).gauge()?.value() + val source0Buffered = meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "merge_0")).gauge()?.value() + val source1Buffered = meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "merge_1")).gauge()?.value() assertThat(sourcesCount).isEqualTo(2.0) // 12 published minus 5 requested = 7 @@ -67,10 +77,11 @@ class ReactorUtilsTest { // then assertThat(received.await(2, TimeUnit.SECONDS)).isTrue() - val buffered = meterRegistry["reactor-buffers.combine"].gauge().value() + val result = meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "combine")).gauge()?.value() // only two last items from source1 are undelivered (6 produces - 4 requested = 2) - assertThat(buffered).isEqualTo(2.0) + assertThat(result).isEqualTo(2.0) } @Test @@ -87,8 +98,10 @@ class ReactorUtilsTest { // then assertThat(received.await(2, TimeUnit.SECONDS)).isTrue() - val discardedItemsBeforeBackpressure = meterRegistry["reactor-discarded-items.latest-before"].counter().count() - val discardedItemsAfterBackpressure = meterRegistry["reactor-discarded-items.latest"].counter().count() + val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest-before")).counter()?.count() + val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_METRIC) + .tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest")).counter()?.count() /** * Published by range: (0..10) @@ -97,7 +110,9 @@ class ReactorUtilsTest { * Not dispatched to subscriber, received by onBackpressure: (4, 6, 8) * Discarded by onBackpressure: (4, 6) */ - assertThat(discardedItemsAfterBackpressure - discardedItemsBeforeBackpressure).isEqualTo(2.0) + assertThat(discardedItemsAfterBackpressure).isNotNull() + assertThat(discardedItemsAfterBackpressure).isNotNull() + assertThat(discardedItemsAfterBackpressure!! - discardedItemsBeforeBackpressure!!).isEqualTo(2.0) } private fun Flux.subscribeRequestingN(n: Int): CountDownLatch { diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt index 6845fcb5a..d54743e20 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.infrastructure import com.ecwid.consul.v1.ConsulClient import com.fasterxml.jackson.databind.ObjectMapper import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.ConfigurationProperties @@ -40,6 +41,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServi import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges +import pl.allegro.tech.servicemesh.envoycontrol.utils.CACHE_GROUP_COUNT_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG import reactor.core.scheduler.Schedulers import java.net.URI @@ -171,15 +178,32 @@ class ControlPlaneConfig { fun localDatacenter(properties: ConsulProperties) = ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local" - fun controlPlaneMetrics(meterRegistry: MeterRegistry) = - DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { - meterRegistry.gauge("services.added", it.servicesAdded) - meterRegistry.gauge("services.removed", it.servicesRemoved) - meterRegistry.gauge("services.instanceChanged", it.instanceChanges) - meterRegistry.gauge("services.snapshotChanged", it.snapshotChanges) - meterRegistry.gauge("cache.groupsCount", it.cacheGroupsCount) - it.meterRegistry.more().counter("services.watch.errors", listOf(), it.errorWatchingServices) + fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics { + return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added", WATCH_TYPE_TAG, "service"), it.servicesAdded) + meterRegistry.gauge( + WATCH_METRIC, + Tags.of(STATUS_TAG, "removed", WATCH_TYPE_TAG, "service"), + it.servicesRemoved + ) + meterRegistry.gauge( + WATCH_METRIC, + Tags.of(STATUS_TAG, "instance-changed", WATCH_TYPE_TAG, "service"), + it.instanceChanges + ) + meterRegistry.gauge( + WATCH_METRIC, + Tags.of(STATUS_TAG, "snapshot-changed", WATCH_TYPE_TAG, "service"), + it.snapshotChanges + ) + meterRegistry.gauge(CACHE_GROUP_COUNT_METRIC, it.cacheGroupsCount) + it.meterRegistry.more().counter( + ERRORS_TOTAL_METRIC, + Tags.of(METRIC_EMITTER_TAG, WATCH_METRIC, WATCH_TYPE_TAG, "service"), + it.errorWatchingServices + ) } + } @Bean fun protobufJsonFormatHttpMessageConverter(): ProtobufJsonFormatHttpMessageConverter { diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt index 557795f7b..73f6e3d0f 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.springframework.web.client.RestTemplate import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState import java.net.URI @@ -32,14 +33,17 @@ class RestTemplateControlPlaneClient( } private fun timed(function: () -> T): T { - return meterRegistry.timer("sync-dc.get-state.time").record(function) + return meterRegistry.timer("cross.dc.synchronization.seconds", Tags.of("operation", "get-state")) + .record(function) } private fun success() { - meterRegistry.counter("sync-dc.get-state.success").increment() + meterRegistry.counter("cross.dc.synchronization", Tags.of("operation", "get-state", "status", "success")) + .increment() } private fun failure() { - meterRegistry.counter("sync-dc.get-state.failure").increment() + meterRegistry.counter("cross.dc.synchronization", Tags.of("operation", "get-state", "status", "failure")) + .increment() } } diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt index 118ef417e..39b01cc79 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt @@ -13,7 +13,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.time.Duration @@ -51,11 +55,14 @@ class ConsulServiceChanges( }, FluxSink.OverflowStrategy.LATEST ) - .measureDiscardedItems("consul-service-changes-emitted", metrics.meterRegistry) + .measureDiscardedItems("consul-service-changes", metrics.meterRegistry) .checkpoint("consul-service-changes-emitted") - .name("consul-service-changes-emitted").metrics() + .name(REACTOR_METRIC) + .tag(METRIC_EMITTER_TAG, "consul-service-changes") + .tag(CHECKPOINT_TAG, "emitted") .checkpoint("consul-service-changes-emitted-distinct") - .name("consul-service-changes-emitted-distinct").metrics() + .tag(CHECKPOINT_TAG, "distinct") + .metrics() .doOnCancel { logger.warn("Cancelling watching consul service changes") watcher.close() @@ -226,10 +233,10 @@ class ConsulServiceChanges( if (ready) { val stopTimer = System.currentTimeMillis() readinessStateHandler.ready() - metrics.meterRegistry.timer("envoy-control.warmup.time") + metrics.meterRegistry.timer(ENVOY_CONTROL_WARM_UP_METRIC) .record( stopTimer - startTimer, - TimeUnit.MILLISECONDS + TimeUnit.SECONDS ) } } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index 912b5c89b..b7c9fda0a 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -1,6 +1,6 @@ package pl.allegro.tech.servicemesh.envoycontrol -import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension @@ -20,7 +20,13 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG import java.util.function.Consumer +import java.util.function.Predicate class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTest { companion object { @@ -61,20 +67,23 @@ class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTe ) override fun expectedGrpcRequestsCounterValues() = mapOf( - CDS.name.toLowerCase() to isGreaterThanZero(), - EDS.name.toLowerCase() to isGreaterThanZero(), - LDS.name.toLowerCase() to isGreaterThanZero(), - RDS.name.toLowerCase() to isGreaterThanZero(), - SDS.name.toLowerCase() to isNull(), - ADS.name.toLowerCase() to isNull(), - UNKNOWN.name.toLowerCase() to isNull(), - "${CDS.name.toLowerCase()}.delta" to isNull(), - "${EDS.name.toLowerCase()}.delta" to isNull(), - "${LDS.name.toLowerCase()}.delta" to isNull(), - "${RDS.name.toLowerCase()}.delta" to isNull(), - "${SDS.name.toLowerCase()}.delta" to isNull(), - "${ADS.name.toLowerCase()}.delta" to isNull(), - "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + CDS.name.lowercase() to isGreaterThanZero(), + EDS.name.lowercase() to isGreaterThanZero(), + LDS.name.lowercase() to isGreaterThanZero(), + RDS.name.lowercase() to isGreaterThanZero(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), + ) + + override fun expectedGrpcRequestsDeltaCounterValues() = mapOf( + CDS.name.lowercase() to isNull(), + EDS.name.lowercase() to isNull(), + LDS.name.lowercase() to isNull(), + RDS.name.lowercase() to isNull(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), ) } @@ -117,20 +126,23 @@ class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTes ) override fun expectedGrpcRequestsCounterValues() = mapOf( - CDS.name.toLowerCase() to isGreaterThanZero(), - EDS.name.toLowerCase() to isGreaterThanZero(), - LDS.name.toLowerCase() to isGreaterThanZero(), - RDS.name.toLowerCase() to isGreaterThanZero(), - SDS.name.toLowerCase() to isNull(), - ADS.name.toLowerCase() to isNull(), - UNKNOWN.name.toLowerCase() to isNull(), - "${CDS.name.toLowerCase()}.delta" to isNull(), - "${EDS.name.toLowerCase()}.delta" to isNull(), - "${LDS.name.toLowerCase()}.delta" to isNull(), - "${RDS.name.toLowerCase()}.delta" to isNull(), - "${SDS.name.toLowerCase()}.delta" to isNull(), - "${ADS.name.toLowerCase()}.delta" to isNull(), - "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + CDS.name.lowercase() to isGreaterThanZero(), + EDS.name.lowercase() to isGreaterThanZero(), + LDS.name.lowercase() to isGreaterThanZero(), + RDS.name.lowercase() to isGreaterThanZero(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), + ) + + override fun expectedGrpcRequestsDeltaCounterValues() = mapOf( + CDS.name.lowercase() to isNull(), + EDS.name.lowercase() to isNull(), + LDS.name.lowercase() to isNull(), + RDS.name.lowercase() to isNull(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), ) } @@ -173,24 +185,30 @@ class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbac ) override fun expectedGrpcRequestsCounterValues() = mapOf( - CDS.name.toLowerCase() to isNull(), - EDS.name.toLowerCase() to isNull(), - LDS.name.toLowerCase() to isNull(), - RDS.name.toLowerCase() to isNull(), - SDS.name.toLowerCase() to isNull(), - ADS.name.toLowerCase() to isNull(), - UNKNOWN.name.toLowerCase() to isNull(), - "${CDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${EDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${LDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${RDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${SDS.name.toLowerCase()}.delta" to isNull(), - "${ADS.name.toLowerCase()}.delta" to isNull(), - "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + CDS.name.lowercase() to isNull(), + EDS.name.lowercase() to isNull(), + LDS.name.lowercase() to isNull(), + RDS.name.lowercase() to isNull(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), + ) + + override fun expectedGrpcRequestsDeltaCounterValues() = mapOf( + CDS.name.lowercase() to isGreaterThanZero(), + EDS.name.lowercase() to isGreaterThanZero(), + LDS.name.lowercase() to isGreaterThanZero(), + RDS.name.lowercase() to isGreaterThanZero(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), ) } interface MetricsDiscoveryServerCallbacksTest { + companion object { + private val logger by logger() + } fun consul(): ConsulExtension @@ -204,7 +222,7 @@ interface MetricsDiscoveryServerCallbacksTest { fun expectedGrpcRequestsCounterValues(): Map Boolean> - fun MeterRegistry.counterValue(name: String) = this.find(name).counter()?.count()?.toInt() + fun expectedGrpcRequestsDeltaCounterValues(): Map Boolean> fun isGreaterThanZero() = { x: Int? -> x!! > 0 } @@ -215,13 +233,22 @@ interface MetricsDiscoveryServerCallbacksTest { // given val meterRegistry = envoyControl().app.meterRegistry() consul().server.operations.registerService(service(), name = "echo") - + for (meter in meterRegistry.meters) { + print(meter.toString()) + } // expect untilAsserted { expectedGrpcConnectionsGaugeValues().forEach { (type, value) -> - val metric = "grpc.connections.${type.name.toLowerCase()}" - assertThat(meterRegistry.find(metric).gauge()).isNotNull - assertThat(meterRegistry.get(metric).gauge().value().toInt()).isEqualTo(value) + val metric = CONNECTIONS_METRIC + assertThat( + meterRegistry.find(metric) + .tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge() + ).isNotNull + assertThat( + meterRegistry.get(metric) + .tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge().value() + .toInt() + ).isEqualTo(value) } } } @@ -229,16 +256,22 @@ interface MetricsDiscoveryServerCallbacksTest { @Test fun `should measure gRPC requests`() { // given - val meterRegistry = envoyControl().app.meterRegistry() consul().server.operations.registerService(service(), name = "echo") // expect untilAsserted { - expectedGrpcRequestsCounterValues().forEach { (type, condition) -> - val counterValue = meterRegistry.counterValue("grpc.requests.$type") - println("$type $counterValue") - assertThat(counterValue).satisfies(Consumer { condition(it) }) + expectedGrpcRequestsCounterValues().forEach { + assertCondition(it.key, it.value, "total") } } } + + private fun assertCondition(type: String, condition: Predicate, reqTpe: String) { + val counterValue = + envoyControl().app.meterRegistry().find(REQUESTS_METRIC) + .tags(Tags.of(STREAM_TYPE_TAG, type, DISCOVERY_REQ_TYPE_TAG, reqTpe, CONNECTION_TYPE_TAG, "grpc")) + .counter()?.count()?.toInt() + logger.info("$type $counterValue") + assertThat(counterValue).satisfies(Consumer { condition.test(it) }) + } }