Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an sctp-pcap-dump feature. #2124

Merged
merged 6 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class RtpReceiverImpl @JvmOverloads constructor(
node(remoteBandwidthEstimator)
// This reads audio levels from packets that use cryptex. TODO: should it go in the Audio path?
node(audioLevelReader.postDecryptNode)
node(toggleablePcapWriter.newObserverNode())
node(toggleablePcapWriter.newObserverNode(outbound = false))
node(statsTracker)
node(PaddingTermination(logger))
demux("Media Type") {
Expand Down Expand Up @@ -259,7 +259,7 @@ class RtpReceiverImpl @JvmOverloads constructor(
predicate = PacketPredicate(Packet::looksLikeRtcp)
path = pipeline {
node(srtcpDecryptWrapper)
node(toggleablePcapWriter.newObserverNode())
node(toggleablePcapWriter.newObserverNode(outbound = false))
node(CompoundRtcpParser(logger))
node(rtcpTermination)
node(packetHandlerWrapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class RtpSenderImpl(
node(statsTracker)
node(TccSeqNumTagger(transportCcEngine, streamInformationStore))
node(HeaderExtEncoder(streamInformationStore, logger))
node(toggleablePcapWriter.newObserverNode())
node(toggleablePcapWriter.newObserverNode(outbound = true))
node(srtpEncryptWrapper)
node(packetStreamStats.createNewNode())
node(PacketLossNode(packetLossConfig), condition = { packetLossConfig.enabled })
Expand Down Expand Up @@ -181,7 +181,7 @@ class RtpSenderImpl(
packetInfo
}
node(rtcpSrUpdater)
node(toggleablePcapWriter.newObserverNode())
node(toggleablePcapWriter.newObserverNode(outbound = true))
node(srtcpEncryptWrapper)
node(packetStreamStats.createNewNode())
node(PacketLossNode(packetLossConfig), condition = { packetLossConfig.enabled })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import kotlin.io.path.Path
class PcapWriter(
parentLogger: Logger,
filePath: Path = Path(directory, "${Random().nextLong()}.pcap")
) : ObserverNode("PCAP writer") {
) {
constructor(parentLogger: Logger, filePath: String) : this(parentLogger, Path(filePath))

private val logger = createChildLogger(parentLogger)
Expand All @@ -60,28 +60,52 @@ class PcapWriter(

companion object {
private val localhost = Inet4Address.getByName("127.0.0.1") as Inet4Address
private val remotehost = Inet4Address.getByName("192.0.2.0") as Inet4Address

private val localport = UdpPort(123, "blah")
private val remoteport = UdpPort(456, "blah")

val directory: String by config("jmt.debug.pcap.directory".from(JitsiConfig.newConfig))
}

override fun observe(packetInfo: PacketInfo) {
fun observe(packetInfo: PacketInfo, outbound: Boolean) =
observe(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length, outbound)

fun observe(buffer: ByteArray, offset: Int, length: Int, outbound: Boolean) {
val udpPayload = UnknownPacket.Builder()
// We can't pass offset/limit values to udpPayload.rawData, so we need to create an array that contains
// only exactly what we want to write
val subBuf = ByteArray(packetInfo.packet.length)
System.arraycopy(packetInfo.packet.buffer, packetInfo.packet.offset, subBuf, 0, packetInfo.packet.length)
val subBuf = ByteArray(length)
System.arraycopy(buffer, offset, subBuf, 0, length)
udpPayload.rawData(subBuf)
val srchost: Inet4Address
val dsthost: Inet4Address
val srcport: UdpPort
val dstport: UdpPort
if (outbound) {
srchost = localhost
srcport = localport
dsthost = remotehost
dstport = remoteport
} else {
srchost = remotehost
srcport = remoteport
dsthost = localhost
dstport = localport
}

val udp = UdpPacket.Builder()
.srcPort(UdpPort(123, "blah"))
.dstPort(UdpPort(456, "blah"))
.srcAddr(localhost)
.dstAddr(localhost)
.srcPort(srcport)
.dstPort(dstport)
.srcAddr(srchost)
.dstAddr(dsthost)
.correctChecksumAtBuild(true)
.correctLengthAtBuild(true)
.payloadBuilder(udpPayload)

val ipPacket = IpV4Packet.Builder()
.srcAddr(localhost)
.dstAddr(localhost)
.srcAddr(srchost)
.dstAddr(dsthost)
.protocol(IpNumber.UDP)
.version(IpVersion.IPV4)
.tos(IpV4Rfc1349Tos.newInstance(0))
Expand All @@ -99,8 +123,6 @@ class PcapWriter(
writer.dump(eth)
}

override fun trace(f: () -> Unit) = f.invoke()

fun close() {
if (lazyWriter.isInitialized() && writer.isOpen) {
writer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ class ToggleablePcapWriter(

fun isEnabled(): Boolean = pcapWriter != null

fun newObserverNode(): Node = PcapWriterNode("Toggleable pcap writer: $prefix")
fun newObserverNode(outbound: Boolean) = PcapWriterNode("Toggleable pcap writer: $prefix", outbound)

private inner class PcapWriterNode(name: String) : ObserverNode(name) {
inner class PcapWriterNode(name: String, val outbound: Boolean) : ObserverNode(name) {
override fun observe(packetInfo: PacketInfo) {
pcapWriter?.processPacket(packetInfo)
pcapWriter?.observe(packetInfo, outbound)
}

fun observe(buffer: ByteArray, offset: Int, length: Int) {
pcapWriter?.observe(buffer, offset, length, outbound)
}

override fun trace(f: () -> Unit) = f.invoke()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DtlsTest : ShouldSpec() {
// The DTLS server's send is wired directly to the DTLS client's receive
dtlsServer.outgoingDataHandler = object : DtlsStack.OutgoingDataHandler {
override fun sendData(data: ByteArray, off: Int, len: Int) {
pcapWriter?.processPacket(PacketInfo(UnparsedPacket(data, off, len)))
pcapWriter?.observe(PacketInfo(UnparsedPacket(data, off, len)), outbound = false)
dtlsClient.processIncomingProtocolData(data, off, len)
}
}
Expand All @@ -79,7 +79,7 @@ class DtlsTest : ShouldSpec() {
// The DTLS client's send is wired directly to the DTLS server's receive
dtlsClient.outgoingDataHandler = object : DtlsStack.OutgoingDataHandler {
override fun sendData(data: ByteArray, off: Int, len: Int) {
pcapWriter?.processPacket(PacketInfo(UnparsedPacket(data, off, len)))
pcapWriter?.observe(PacketInfo(UnparsedPacket(data, off, len)), outbound = true)
dtlsServer.processIncomingProtocolData(data, off, len)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

public enum EndpointDebugFeatures
{
PCAP_DUMP("pcap-dump");

PCAP_DUMP("pcap-dump"),
SCTP_PCAP_DUMP("sctp-pcap-dump");

private final String value;

Expand Down
21 changes: 20 additions & 1 deletion jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.jitsi.nlj.rtp.VideoRtpPacket
import org.jitsi.nlj.srtp.TlsRole
import org.jitsi.nlj.stats.EndpointConnectionStats
import org.jitsi.nlj.transform.node.ConsumerNode
import org.jitsi.nlj.transform.node.ToggleablePcapWriter
import org.jitsi.nlj.transform.pipeline
import org.jitsi.nlj.util.Bandwidth
import org.jitsi.nlj.util.LocalSsrcAssociation
import org.jitsi.nlj.util.NEVER
Expand Down Expand Up @@ -125,6 +127,15 @@ class Endpoint @JvmOverloads constructor(
private val sctpHandler = SctpHandler()
private val dataChannelHandler = DataChannelHandler()

private val toggleablePcapWriter = ToggleablePcapWriter(logger, "$id-sctp")
private val sctpRecvPcap = toggleablePcapWriter.newObserverNode(outbound = false)
private val sctpSendPcap = toggleablePcapWriter.newObserverNode(outbound = true)

private val sctpPipeline = pipeline {
node(sctpRecvPcap)
node(sctpHandler)
}

/* TODO: do we ever want to support useUniquePort for an Endpoint? */
private val iceTransport = IceTransport(id, iceControlling, false, supportsPrivateAddresses, logger)
private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.endpoint }
Expand Down Expand Up @@ -460,12 +471,19 @@ class Endpoint @JvmOverloads constructor(
fun setFeature(feature: EndpointDebugFeatures, enabled: Boolean) {
when (feature) {
EndpointDebugFeatures.PCAP_DUMP -> transceiver.setFeature(Features.TRANSCEIVER_PCAP_DUMP, enabled)
EndpointDebugFeatures.SCTP_PCAP_DUMP ->
if (enabled) {
toggleablePcapWriter.enable()
} else {
toggleablePcapWriter.disable()
}
}
}

fun isFeatureEnabled(feature: EndpointDebugFeatures): Boolean {
return when (feature) {
EndpointDebugFeatures.PCAP_DUMP -> transceiver.isFeatureEnabled(Features.TRANSCEIVER_PCAP_DUMP)
EndpointDebugFeatures.SCTP_PCAP_DUMP -> toggleablePcapWriter.isEnabled()
}
}

Expand Down Expand Up @@ -522,7 +540,7 @@ class Endpoint @JvmOverloads constructor(
*/
// TODO(brian): change sctp handler to take buf, off, len
fun dtlsAppPacketReceived(data: ByteArray, off: Int, len: Int) =
sctpHandler.processPacket(PacketInfo(UnparsedPacket(data, off, len)))
sctpPipeline.processPacket(PacketInfo(UnparsedPacket(data, off, len)))

private fun effectiveVideoConstraintsChanged(
oldEffectiveConstraints: EffectiveConstraintsMap,
Expand Down Expand Up @@ -577,6 +595,7 @@ class Endpoint @JvmOverloads constructor(
// Create the SctpManager and provide it a method for sending SCTP data
sctpManager = SctpManager(
{ data, offset, length ->
sctpSendPcap.observe(data, offset, length)
dtlsTransport.sendDtlsData(data, offset, length)
0
},
Expand Down
21 changes: 20 additions & 1 deletion jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.jitsi.nlj.srtp.SrtpUtil
import org.jitsi.nlj.srtp.TlsRole
import org.jitsi.nlj.stats.EndpointConnectionStats
import org.jitsi.nlj.transform.node.ConsumerNode
import org.jitsi.nlj.transform.node.ToggleablePcapWriter
import org.jitsi.nlj.transform.pipeline
import org.jitsi.nlj.util.Bandwidth
import org.jitsi.nlj.util.BufferPool
import org.jitsi.nlj.util.LocalSsrcAssociation
Expand Down Expand Up @@ -168,6 +170,15 @@ class Relay @JvmOverloads constructor(
private val sctpHandler = SctpHandler()
private val dataChannelHandler = DataChannelHandler()

private val toggleablePcapWriter = ToggleablePcapWriter(logger, "$id-sctp")
private val sctpRecvPcap = toggleablePcapWriter.newObserverNode(outbound = false)
private val sctpSendPcap = toggleablePcapWriter.newObserverNode(outbound = true)

private val sctpPipeline = pipeline {
node(sctpRecvPcap)
node(sctpHandler)
}

private val iceTransport = IceTransport(
id = id,
controlling = iceControlling,
Expand Down Expand Up @@ -447,6 +458,7 @@ class Relay @JvmOverloads constructor(
// Create the SctpManager and provide it a method for sending SCTP data
val sctpManager = SctpManager(
{ data, offset, length ->
sctpSendPcap.observe(data, offset, length)
dtlsTransport.sendDtlsData(data, offset, length)
0
},
Expand Down Expand Up @@ -626,12 +638,19 @@ class Relay @JvmOverloads constructor(
}
senders.values.forEach { s -> s.setFeature(Features.TRANSCEIVER_PCAP_DUMP, enabled) }
}
EndpointDebugFeatures.SCTP_PCAP_DUMP ->
if (enabled) {
toggleablePcapWriter.enable()
} else {
toggleablePcapWriter.disable()
}
}
}

fun isFeatureEnabled(feature: EndpointDebugFeatures): Boolean {
return when (feature) {
EndpointDebugFeatures.PCAP_DUMP -> transceiver.isFeatureEnabled(Features.TRANSCEIVER_PCAP_DUMP)
EndpointDebugFeatures.SCTP_PCAP_DUMP -> toggleablePcapWriter.isEnabled()
}
}

Expand Down Expand Up @@ -738,7 +757,7 @@ class Relay @JvmOverloads constructor(
*/
// TODO(brian): change sctp handler to take buf, off, len
fun dtlsAppPacketReceived(data: ByteArray, off: Int, len: Int) =
sctpHandler.processPacket(PacketInfo(UnparsedPacket(data, off, len)))
sctpPipeline.processPacket(PacketInfo(UnparsedPacket(data, off, len)))

/**
* Return the newly created endpoint, or null if an endpoint with that ID already existed. Note that the new
Expand Down
Loading