Skip to content

Commit

Permalink
Add metrics for queue drops/exceptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrozev committed May 2, 2024
1 parent 7a67c9e commit 0f85196
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class RtpReceiverImpl @JvmOverloads constructor(
}

companion object {
val queueErrorCounter = CountingErrorHandler()
var queueErrorCounter = CountingErrorHandler()

private const val PACKET_QUEUE_ENTRY_EVENT = "Entered RTP receiver incoming queue"
private const val PACKET_QUEUE_EXIT_EVENT = "Exited RTP receiver incoming queue"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class RtpSenderImpl(
}

companion object {
val queueErrorCounter = CountingErrorHandler()
var queueErrorCounter = CountingErrorHandler()

private const val PACKET_QUEUE_ENTRY_EVENT = "Entered RTP sender incoming queue"
private const val PACKET_QUEUE_EXIT_EVENT = "Exited RTP sender incoming queue"
Expand Down
15 changes: 5 additions & 10 deletions jvb/src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,7 @@ public Conference(Videobridge videobridge,
this.id = Objects.requireNonNull(id, "id");
this.conferenceName = conferenceName;
this.colibri2Handler = new Colibri2ConferenceHandler(this, logger);
colibriQueue = new PacketQueue<>(
Integer.MAX_VALUE,
true,
"colibri-queue",
colibriQueue = new ColibriQueue(
request ->
{
try
Expand Down Expand Up @@ -256,12 +253,10 @@ public Conference(Videobridge videobridge,
e.getMessage()));
}
return true;
},
TaskPools.IO_POOL,
Clock.systemUTC(), // TODO: using the Videobridge clock breaks tests somehow
/* Allow running tasks to complete (so we can close the queue from within the task. */
false
);
}
)
{
};

speechActivity = new ConferenceSpeechActivity(new SpeechActivityListener());
updateLastNEndpointsFuture = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package org.jitsi.videobridge

import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.BridgeChannelMessage.Companion.parse
import org.jitsi.videobridge.message.MessageHandler
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.json.simple.JSONObject
import java.io.IOException
Expand All @@ -31,7 +34,7 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH

abstract val isConnected: Boolean

private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue(
private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue<MessageAndSource>(
50,
true,
INCOMING_MESSAGE_QUEUE_ID,
Expand All @@ -47,7 +50,7 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH
},
TaskPools.IO_POOL,
Clock.systemUTC()
)
).apply { setErrorHandler(queueErrorCounter) }

/**
* Fires the message transport ready event for the associated endpoint.
Expand Down Expand Up @@ -97,5 +100,23 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH

companion object {
const val INCOMING_MESSAGE_QUEUE_ID = "bridge-channel-message-incoming-queue"
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"endpoint_receive_message_queue_dropped_packets",
"Number of packets dropped out of the Endpoint receive message queue."
)
private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"endpoint_receive_message_queue_exceptions",
"Number of exceptions from the Endpoint receive message queue."
)
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}
}
}
68 changes: 68 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/ColibriQueue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge

import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.videobridge.xmpp.XmppConnection
import java.time.Clock
import kotlin.Int.Companion.MAX_VALUE

abstract class ColibriQueue(packetHandler: PacketHandler<XmppConnection.ColibriRequest>) :
PacketQueue<XmppConnection.ColibriRequest>(
MAX_VALUE,
true,
QUEUE_NAME,
packetHandler,
TaskPools.IO_POOL,
// TODO: using the Videobridge clock breaks tests somehow
Clock.systemUTC(),
// Allow running tasks to complete (so we can close the queue from within the task.
false,
) {
init {
setErrorHandler(queueErrorCounter)
}

companion object {
val QUEUE_NAME = "colibri-queue"

val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"colibri_queue_dropped_packets",
"Number of packets dropped out of the Colibri queue."
)

val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"colibri_queue_exceptions",
"Number of exceptions from the Colibri queue."
)

/** Count the number of dropped packets and exceptions. */
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}
}
}
27 changes: 23 additions & 4 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.ForwardedSourcesMessage
import org.jitsi.videobridge.message.ReceiverVideoConstraintsMessage
import org.jitsi.videobridge.message.SenderSourceConstraintsMessage
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.relay.AudioSourceDesc
import org.jitsi.videobridge.relay.RelayedEndpoint
import org.jitsi.videobridge.rest.root.debug.EndpointDebugFeatures
Expand Down Expand Up @@ -1109,11 +1111,28 @@ class Endpoint @JvmOverloads constructor(
*/
private const val OPEN_DATA_CHANNEL_LOCALLY = false

/**
* Count the number of dropped packets and exceptions.
*/
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"srtp_send_queue_dropped_packets",
"Number of packets dropped out of the Endpoint SRTP send queue."
)

private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"srtp_send_queue_exceptions",
"Number of exceptions from the Endpoint SRTP send queue."
)

/** Count the number of dropped packets and exceptions. */
@JvmField
val queueErrorCounter = CountingErrorHandler()
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}

/**
* The executor which runs bandwidth probing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Metrics {
if (JvmMetrics.enable) {
metricsUpdater.addUpdateTask { JvmMetrics.update() }
}
QueueMetrics.init()
}
fun stop() {
metricsUpdater.stop()
Expand Down
73 changes: 73 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/metrics/QueueMetrics.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.metrics

import org.jitsi.nlj.RtpReceiverImpl
import org.jitsi.nlj.RtpSenderImpl
import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer.Companion.instance as metricsContainer

object QueueMetrics {
private val rtpReceiverDroppedPackets = metricsContainer.registerCounter(
"rtp_receiver_dropped_packets",
"Number of packets dropped out of the RTP receiver queue."
)
private val rtpReceiverExceptions = metricsContainer.registerCounter(
"rtp_receiver_exceptions",
"Number of exceptions from the RTP receiver queue."
)
private val rtpSenderDroppedPackets = metricsContainer.registerCounter(
"rtp_sender_dropped_packets",
"Number of packets dropped out of the RTP sender queue."
)
private val rtpSenderExceptions = metricsContainer.registerCounter(
"rtp_sender_exceptions",
"Number of exceptions from the RTP sender queue."
)

val droppedPackets = metricsContainer.registerCounter(
"queue_dropped_packets",
"Number of packets dropped from any of the queues."
)
val exceptions = metricsContainer.registerCounter(
"queue_exceptions",
"Number of exceptions from any of the queues."
)

fun init() {
RtpReceiverImpl.queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
rtpReceiverDroppedPackets.inc()
droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
rtpReceiverExceptions.inc()
exceptions.inc()
}
}

RtpSenderImpl.queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
rtpSenderDroppedPackets.inc()
droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
rtpSenderExceptions.inc()
exceptions.inc()
}
}
}
}
28 changes: 24 additions & 4 deletions jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ import org.jitsi.videobridge.datachannel.protocol.DataChannelPacket
import org.jitsi.videobridge.datachannel.protocol.DataChannelProtocolConstants
import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.SourceVideoTypeMessage
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.rest.root.debug.EndpointDebugFeatures
import org.jitsi.videobridge.sctp.DataChannelHandler
import org.jitsi.videobridge.sctp.SctpHandler
Expand Down Expand Up @@ -1135,11 +1137,29 @@ class Relay @JvmOverloads constructor(
}

companion object {
/**
* Count the number of dropped packets and exceptions.
*/
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"relay_srtp_send_queue_dropped_packets",
"Number of packets dropped out of the Relay SRTP send queue."
)

private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"relay_srtp_send_queue_exceptions",
"Number of exceptions from the Relay SRTP send queue."
)

/** Count the number of dropped packets and exceptions. */
@JvmField
val queueErrorCounter = CountingErrorHandler()
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}

override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}

private const val SRTP_QUEUE_ENTRY_EVENT = "Entered Relay SRTP sender outgoing queue"
private const val SRTP_QUEUE_EXIT_EVENT = "Exited Relay SRTP sender outgoing queue"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.jitsi.utils.logging2.cdebug
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.videobridge.TransportConfig
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.transport.ice.IceTransport
import org.jitsi.videobridge.util.TaskPools
import org.json.simple.JSONObject
Expand Down Expand Up @@ -163,11 +165,29 @@ class RelayEndpointSender(
}

companion object {
/**
* Count the number of dropped packets and exceptions.
*/
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"relay_endpoint_srtp_send_queue_dropped_packets",
"Number of packets dropped out of the Relay SRTP send queue."
)

private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"relay_endpoint_srtp_send_queue_exceptions",
"Number of exceptions from the Relay SRTP send queue."
)

/** Count the number of dropped packets and exceptions. */
@JvmField
val queueErrorCounter = CountingErrorHandler()
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}

override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}

private const val SRTP_QUEUE_ENTRY_EVENT = "Entered RelayEndpointSender SRTP sender outgoing queue"
}
Expand Down

0 comments on commit 0f85196

Please sign in to comment.