From 380cc72f6c3ccae7a4d6e3be24c3176d40093320 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 29 Jul 2022 03:27:42 -0400 Subject: [PATCH] Add playout delay header interceptor This interceptor adds the playout delay header extension following https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay --- .../header_extension_interceptor.go | 61 +++++++++++++++++ .../header_extension_interceptor_test.go | 68 +++++++++++++++++++ pkg/playoutdelay/playout_delay.go | 52 ++++++++++++++ 3 files changed, 181 insertions(+) create mode 100644 pkg/playoutdelay/header_extension_interceptor.go create mode 100644 pkg/playoutdelay/header_extension_interceptor_test.go create mode 100644 pkg/playoutdelay/playout_delay.go diff --git a/pkg/playoutdelay/header_extension_interceptor.go b/pkg/playoutdelay/header_extension_interceptor.go new file mode 100644 index 00000000..358c3050 --- /dev/null +++ b/pkg/playoutdelay/header_extension_interceptor.go @@ -0,0 +1,61 @@ +package playoutdelay + +import ( + "time" + + "github.com/pion/interceptor" + "github.com/pion/rtp" +) + +// HeaderExtensionInterceptorFactory is a interceptor.Factory for a HeaderExtensionInterceptor +type HeaderExtensionInterceptorFactory struct{} + +// NewInterceptor constructs a new HeaderExtensionInterceptor +func (h *HeaderExtensionInterceptorFactory) NewInterceptor(id string, minDelay, maxDelay time.Duration) (interceptor.Interceptor, error) { + if minDelay.Milliseconds() < 0 || minDelay.Milliseconds() > 40950 || maxDelay.Milliseconds() < 0 || maxDelay.Milliseconds() > 40950 { + return nil, errPlayoutDelayInvalidValue + } + return &HeaderExtensionInterceptor{minDelay: uint16(minDelay.Milliseconds() / 10), maxDelay: uint16(maxDelay.Milliseconds() / 10)}, nil +} + +// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptorFactory +func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptorFactory, error) { + return &HeaderExtensionInterceptorFactory{}, nil +} + +// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet +type HeaderExtensionInterceptor struct { + interceptor.NoOp + minDelay, maxDelay uint16 +} + +const playoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay" + +// BindLocalStream returns a writer that adds a rtp.TransportCCExtension +// header with increasing sequence numbers to each outgoing packet. +func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + var hdrExtID uint8 + for _, e := range info.RTPHeaderExtensions { + if e.URI == playoutDelayURI { + hdrExtID = uint8(e.ID) + break + } + } + if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID + return writer + } + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + tcc, err := (&Extension{ + minDelay: h.minDelay, + maxDelay: h.maxDelay, + }).Marshal() + if err != nil { + return 0, err + } + err = header.SetExtension(hdrExtID, tcc) + if err != nil { + return 0, err + } + return writer.Write(header, payload, attributes) + }) +} diff --git a/pkg/playoutdelay/header_extension_interceptor_test.go b/pkg/playoutdelay/header_extension_interceptor_test.go new file mode 100644 index 00000000..20335ed1 --- /dev/null +++ b/pkg/playoutdelay/header_extension_interceptor_test.go @@ -0,0 +1,68 @@ +package playoutdelay + +import ( + "sync" + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestHeaderExtensionInterceptor(t *testing.T) { + t.Run("add playout delay to each packet", func(t *testing.T) { + factory, err := NewHeaderExtensionInterceptor() + assert.NoError(t, err) + + inter, err := factory.NewInterceptor("", 10*time.Millisecond, 20*time.Millisecond) + assert.NoError(t, err) + + pChan := make(chan *rtp.Packet, 10*5) + go func() { + // start some parallel streams using the same interceptor to test for race conditions + var wg sync.WaitGroup + num := 10 + wg.Add(num) + for i := 0; i < num; i++ { + go func(ch chan *rtp.Packet, id uint16) { + stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{ + { + URI: playoutDelayURI, + ID: 1, + }, + }}, inter) + defer func() { + wg.Done() + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})) + select { + case p := <-stream.WrittenRTP(): + assert.Equal(t, seqNum, p.SequenceNumber) + ch <- p + case <-time.After(10 * time.Millisecond): + panic("written rtp packet not found") + } + } + }(pChan, uint16(i+1)) + } + wg.Wait() + close(pChan) + }() + + for p := range pChan { + // Can't check for increasing transport cc sequence number, since we can't ensure ordering between the streams + // on pChan is same as in the interceptor, but at least make sure each packet has a seq nr. + extensionHeader := p.GetExtension(1) + ext := &Extension{} + err = ext.Unmarshal(extensionHeader) + assert.NoError(t, err) + assert.Equal(t, uint16(1), ext.minDelay) + assert.Equal(t, uint16(2), ext.maxDelay) + } + }) +} diff --git a/pkg/playoutdelay/playout_delay.go b/pkg/playoutdelay/playout_delay.go new file mode 100644 index 00000000..aa772ace --- /dev/null +++ b/pkg/playoutdelay/playout_delay.go @@ -0,0 +1,52 @@ +// Package playoutdelay implements the playout delay header extension. +package playoutdelay + +import ( + "encoding/binary" + "errors" +) + +const ( + // transport-wide sequence + playoutDelayExtensionSize = 3 + playoutDelayMaxValue = (1 << 12) - 1 +) + +var ( + errPlayoutDelayInvalidValue = errors.New("invalid playout delay value") + errTooSmall = errors.New("playout delay header extension too short") +) + +// Extension is a extension payload format in +// https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | ID | len=2 | MIN delay | MAX delay | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +type Extension struct { + minDelay, maxDelay uint16 +} + +// Marshal serializes the members to buffer +func (p Extension) Marshal() ([]byte, error) { + if p.minDelay >= playoutDelayMaxValue || p.maxDelay >= playoutDelayMaxValue { + return nil, errPlayoutDelayInvalidValue + } + + return []byte{ + byte(p.minDelay >> 4), + byte(p.minDelay<<4) | byte(p.maxDelay>>8), + byte(p.maxDelay), + }, nil +} + +// Unmarshal parses the passed byte slice and stores the result in the members +func (p *Extension) Unmarshal(rawData []byte) error { + if len(rawData) < playoutDelayExtensionSize { + return errTooSmall + } + p.minDelay = binary.BigEndian.Uint16(rawData[0:2]) >> 4 + p.maxDelay = binary.BigEndian.Uint16(rawData[1:3]) & 0x0FFF + return nil +}