Skip to content

Commit

Permalink
feat: Export more metrics to prometheus. (#2129)
Browse files Browse the repository at this point in the history
* feat: Export JVM metrics to prometheus.
* feat: Add a startup_time metric for detection of restarts.
* feat: Add metrics for queue drops/exceptions.
  • Loading branch information
bgrozev authored May 2, 2024
1 parent b623d6a commit acf024b
Show file tree
Hide file tree
Showing 15 changed files with 369 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class RtpReceiverImpl @JvmOverloads constructor(
}

companion object {
val queueErrorCounter = CountingErrorHandler()
var queueErrorCounter = CountingErrorHandler()

private const val PACKET_QUEUE_ENTRY_EVENT = "Entered RTP receiver incoming queue"
private const val PACKET_QUEUE_EXIT_EVENT = "Exited RTP receiver incoming queue"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class RtpSenderImpl(
}

companion object {
val queueErrorCounter = CountingErrorHandler()
var queueErrorCounter = CountingErrorHandler()

private const val PACKET_QUEUE_ENTRY_EVENT = "Entered RTP sender incoming queue"
private const val PACKET_QUEUE_EXIT_EVENT = "Exited RTP sender incoming queue"
Expand Down
15 changes: 5 additions & 10 deletions jvb/src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,7 @@ public Conference(Videobridge videobridge,
this.id = Objects.requireNonNull(id, "id");
this.conferenceName = conferenceName;
this.colibri2Handler = new Colibri2ConferenceHandler(this, logger);
colibriQueue = new PacketQueue<>(
Integer.MAX_VALUE,
true,
"colibri-queue",
colibriQueue = new ColibriQueue(
request ->
{
try
Expand Down Expand Up @@ -256,12 +253,10 @@ public Conference(Videobridge videobridge,
e.getMessage()));
}
return true;
},
TaskPools.IO_POOL,
Clock.systemUTC(), // TODO: using the Videobridge clock breaks tests somehow
/* Allow running tasks to complete (so we can close the queue from within the task. */
false
);
}
)
{
};

speechActivity = new ConferenceSpeechActivity(new SpeechActivityListener());
updateLastNEndpointsFuture = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package org.jitsi.videobridge

import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.BridgeChannelMessage.Companion.parse
import org.jitsi.videobridge.message.MessageHandler
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.json.simple.JSONObject
import java.io.IOException
Expand All @@ -31,7 +34,7 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH

abstract val isConnected: Boolean

private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue(
private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue<MessageAndSource>(
50,
true,
INCOMING_MESSAGE_QUEUE_ID,
Expand All @@ -47,7 +50,7 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH
},
TaskPools.IO_POOL,
Clock.systemUTC()
)
).apply { setErrorHandler(queueErrorCounter) }

/**
* Fires the message transport ready event for the associated endpoint.
Expand Down Expand Up @@ -97,5 +100,23 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH

companion object {
const val INCOMING_MESSAGE_QUEUE_ID = "bridge-channel-message-incoming-queue"
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"endpoint_receive_message_queue_dropped_packets",
"Number of packets dropped out of the Endpoint receive message queue."
)
private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"endpoint_receive_message_queue_exceptions",
"Number of exceptions from the Endpoint receive message queue."
)
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}
}
}
68 changes: 68 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/ColibriQueue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge

import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.videobridge.xmpp.XmppConnection
import java.time.Clock
import kotlin.Int.Companion.MAX_VALUE

abstract class ColibriQueue(packetHandler: PacketHandler<XmppConnection.ColibriRequest>) :
PacketQueue<XmppConnection.ColibriRequest>(
MAX_VALUE,
true,
QUEUE_NAME,
packetHandler,
TaskPools.IO_POOL,
// TODO: using the Videobridge clock breaks tests somehow
Clock.systemUTC(),
// Allow running tasks to complete (so we can close the queue from within the task).
false,
) {
init {
setErrorHandler(queueErrorCounter)
}

companion object {
val QUEUE_NAME = "colibri-queue"

val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"colibri_queue_dropped_packets",
"Number of packets dropped out of the Colibri queue."
)

val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"colibri_queue_exceptions",
"Number of exceptions from the Colibri queue."
)

/** Count the number of dropped packets and exceptions. */
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}
}
}
27 changes: 23 additions & 4 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.ForwardedSourcesMessage
import org.jitsi.videobridge.message.ReceiverVideoConstraintsMessage
import org.jitsi.videobridge.message.SenderSourceConstraintsMessage
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.relay.AudioSourceDesc
import org.jitsi.videobridge.relay.RelayedEndpoint
import org.jitsi.videobridge.rest.root.debug.EndpointDebugFeatures
Expand Down Expand Up @@ -1109,11 +1111,28 @@ class Endpoint @JvmOverloads constructor(
*/
private const val OPEN_DATA_CHANNEL_LOCALLY = false

/**
* Count the number of dropped packets and exceptions.
*/
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"srtp_send_queue_dropped_packets",
"Number of packets dropped out of the Endpoint SRTP send queue."
)

private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"srtp_send_queue_exceptions",
"Number of exceptions from the Endpoint SRTP send queue."
)

/** Count the number of dropped packets and exceptions. */
@JvmField
val queueErrorCounter = CountingErrorHandler()
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}

/**
* The executor which runs bandwidth probing.
Expand Down
106 changes: 106 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/metrics/JvmMetrics.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.metrics

import com.sun.management.UnixOperatingSystemMXBean
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.config
import org.jitsi.utils.logging2.createLogger
import java.lang.management.ManagementFactory
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer.Companion.instance as metricsContainer

class JvmMetrics private constructor() {
val logger = createLogger()

private val gcType = ManagementFactory.getGarbageCollectorMXBeans().firstOrNull()?.name.let {
when {
it?.contains("shenandoah", ignoreCase = true) == true -> GcType.Shenandoah
it?.contains("zgc", ignoreCase = true) == true -> GcType.Zgc
it?.contains("g1", ignoreCase = true) == true -> GcType.G1
else -> GcType.Other
}
}.also {
logger.info("Detected GC type $it")
}

fun update() {
threadCount.set(ManagementFactory.getThreadMXBean().threadCount.toLong())
gcCount.set(
ManagementFactory.getGarbageCollectorMXBeans().sumOf { it.collectionCount }
)
gcTime.set(
ManagementFactory.getGarbageCollectorMXBeans().sumOf { it.collectionTime }
)
(ManagementFactory.getOperatingSystemMXBean() as? UnixOperatingSystemMXBean)?.let {
openFdCount.set(it.openFileDescriptorCount)
}
if (gcType != GcType.Other) {
ManagementFactory.getMemoryPoolMXBeans().find { it.name == gcType.memoryPoolName }?.let {
heapUsed.set(it.usage.used)
heapCommitted.set(it.usage.committed)
}
}
}

val threadCount = metricsContainer.registerLongGauge(
"thread_count",
"Current number of JVM threads."
)

private val gcCount = metricsContainer.registerLongGauge(
"jvm_gc_count",
"Garbage collection count."
)

private val gcTime = metricsContainer.registerLongGauge(
"jvm_gc_time",
"Garbage collection time."
)

private val heapCommitted = metricsContainer.registerLongGauge(
"jvm_heap_committed",
"Capacity of the main memory pool for the heap (GC type specific)."
)

private val heapUsed = metricsContainer.registerLongGauge(
"jvm_heap_used",
"Usage of the main memory pool for the heap (GC type specific)."
)

private val openFdCount = metricsContainer.registerLongGauge(
"jvm_open_fd_count",
"Number of open file descriptors."
)

private enum class GcType(
/** The name of the memory pool we're interested with this type of GC */
val memoryPoolName: String?
) {
G1("G1 Old Gen"),
Zgc("ZHeap"),
Shenandoah("Shenandoah"),
Other(null)
}

companion object {
val enable: Boolean by config {
"videobridge.stats.jvm.enabled".from(JitsiConfig.newConfig)
}

val INSTANCE = if (enable) JvmMetrics() else null
fun update() = INSTANCE?.update()
}
}
7 changes: 6 additions & 1 deletion jvb/src/main/kotlin/org/jitsi/videobridge/metrics/Metrics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ object Metrics {
val lock: Any
get() = metricsUpdater

fun start() = metricsUpdater.addUpdateTask { ThreadsMetric.update() }
fun start() {
if (JvmMetrics.enable) {
metricsUpdater.addUpdateTask { JvmMetrics.update() }
}
QueueMetrics.init()
}
fun stop() {
metricsUpdater.stop()
executor.shutdown()
Expand Down
Loading

0 comments on commit acf024b

Please sign in to comment.