Skip to content

Commit

Permalink
feat: Add playout delay header interceptor
Browse files Browse the repository at this point in the history
This interceptor adds the playout delay header extension following
https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
  • Loading branch information
kevmo314 committed Jul 29, 2022
1 parent a82b843 commit aab5891
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 0 deletions.
60 changes: 60 additions & 0 deletions pkg/playoutdelay/header_extension_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package playoutdelay

import (
"github.com/pion/interceptor"
"github.com/pion/rtp"
"time"
)

// HeaderExtensionInterceptorFactory is a interceptor.Factory for a HeaderExtensionInterceptor
type HeaderExtensionInterceptorFactory struct{}

// NewInterceptor constructs a new HeaderExtensionInterceptor
func (h *HeaderExtensionInterceptorFactory) NewInterceptor(id string, minDelay, maxDelay time.Duration) (interceptor.Interceptor, error) {
if minDelay.Milliseconds() < 0 || minDelay.Milliseconds() > 40950 || maxDelay.Milliseconds() < 0 || maxDelay.Milliseconds() > 40950 {
return nil, errPlayoutDelayInvalidValue
}
return &HeaderExtensionInterceptor{minDelay: uint16(minDelay.Milliseconds() / 10), maxDelay: uint16(maxDelay.Milliseconds() / 10)}, nil
}

// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptorFactory
func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptorFactory, error) {
return &HeaderExtensionInterceptorFactory{}, nil
}

// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet
type HeaderExtensionInterceptor struct {
interceptor.NoOp
minDelay, maxDelay uint16
}

const playoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay"

// BindLocalStream returns a writer that adds a rtp.TransportCCExtension
// header with increasing sequence numbers to each outgoing packet.
func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
var hdrExtID uint8
for _, e := range info.RTPHeaderExtensions {
if e.URI == playoutDelayURI {
hdrExtID = uint8(e.ID)
break
}
}
if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID
return writer
}
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
tcc, err := (&PlayoutDelayExtension{
minDelay: h.minDelay,
maxDelay: h.maxDelay,
}).Marshal()
if err != nil {
return 0, err
}
err = header.SetExtension(hdrExtID, tcc)
if err != nil {
return 0, err
}
return writer.Write(header, payload, attributes)
})
}
68 changes: 68 additions & 0 deletions pkg/playoutdelay/header_extension_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package playoutdelay

import (
"sync"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestHeaderExtensionInterceptor(t *testing.T) {
t.Run("add playout delay to each packet", func(t *testing.T) {
factory, err := NewHeaderExtensionInterceptor()
assert.NoError(t, err)

inter, err := factory.NewInterceptor("", 10*time.Millisecond, 20*time.Millisecond)
assert.NoError(t, err)

pChan := make(chan *rtp.Packet, 10*5)
go func() {
// start some parallel streams using the same interceptor to test for race conditions
var wg sync.WaitGroup
num := 10
wg.Add(num)
for i := 0; i < num; i++ {
go func(ch chan *rtp.Packet, id uint16) {
stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{
{
URI: playoutDelayURI,
ID: 1,
},
}}, inter)
defer func() {
wg.Done()
assert.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} {
assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
select {
case p := <-stream.WrittenRTP():
assert.Equal(t, seqNum, p.SequenceNumber)
ch <- p
case <-time.After(10 * time.Millisecond):
panic("written rtp packet not found")
}
}
}(pChan, uint16(i+1))
}
wg.Wait()
close(pChan)
}()

for p := range pChan {
// Can't check for increasing transport cc sequence number, since we can't ensure ordering between the streams
// on pChan is same as in the interceptor, but at least make sure each packet has a seq nr.
extensionHeader := p.GetExtension(1)
ext := &PlayoutDelayExtension{}
err = ext.Unmarshal(extensionHeader)
assert.NoError(t, err)
assert.Equal(t, uint16(1), ext.minDelay)
assert.Equal(t, uint16(2), ext.maxDelay)
}
})
}
52 changes: 52 additions & 0 deletions pkg/playoutdelay/playout_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Package playoutdelay implements the playout delay header extension.
package playoutdelay

import (
"encoding/binary"
"errors"
)

const (
// transport-wide sequence
playoutDelayExtensionSize = 3
playoutDelayMaxValue = (1 << 12) - 1
)

var (
errPlayoutDelayInvalidValue = errors.New("invalid playout delay value")
errTooSmall = errors.New("playout delay header extension too short")
)

// PlayoutDelayExtension is a extension payload format in
// https://webrtc.googlesource.com/src/+/refs/heads/main/docs/native-code/rtp-hdrext/playout-delay
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | len=2 | MIN delay | MAX delay |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
type PlayoutDelayExtension struct {
minDelay, maxDelay uint16
}

// Marshal serializes the members to buffer
func (p PlayoutDelayExtension) Marshal() ([]byte, error) {
if p.minDelay >= playoutDelayMaxValue || p.maxDelay >= playoutDelayMaxValue {
return nil, errPlayoutDelayInvalidValue
}

return []byte{
byte(p.minDelay >> 4),
byte(p.minDelay<<4) | byte(p.maxDelay>>8),
byte(p.maxDelay),
}, nil
}

// Unmarshal parses the passed byte slice and stores the result in the members
func (p *PlayoutDelayExtension) Unmarshal(rawData []byte) error {
if len(rawData) < playoutDelayExtensionSize {
return errTooSmall
}
p.minDelay = binary.BigEndian.Uint16(rawData[0:2]) >> 4
p.maxDelay = binary.BigEndian.Uint16(rawData[1:3]) & 0x0FFF
return nil
}

0 comments on commit aab5891

Please sign in to comment.