Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Static jitter buffer #34

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion gumble/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type AudioPacket struct {
Target *VoiceTarget

AudioBuffer

HasPosition bool
X, Y, Z float32
}
2 changes: 2 additions & 0 deletions gumble/audiocodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,7 @@ type AudioEncoder interface {
type AudioDecoder interface {
ID() int
Decode(data []byte, frameSize int) ([]int16, error)
SampleSize(data []byte) (int, error)
CountFrames(data []byte) (int, error)
Reset()
}
4 changes: 2 additions & 2 deletions gumble/audiolisteners.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type audioEventItem struct {
parent *AudioListeners
prev, next *audioEventItem
listener AudioListener
streams map[*User]chan *AudioPacket
//streams map[*User]chan *AudioPacket
}

func (e *audioEventItem) Detach() {
Expand Down Expand Up @@ -32,7 +32,7 @@ func (e *AudioListeners) Attach(listener AudioListener) Detacher {
parent: e,
prev: e.tail,
listener: listener,
streams: make(map[*User]chan *AudioPacket),
//streams: make(map[*User]chan *AudioPacket),
}
if e.head == nil {
e.head = item
Expand Down
61 changes: 18 additions & 43 deletions gumble/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package gumble // import "layeh.com/gumble/gumble"

import (
"crypto/x509"
"encoding/binary"
// "encoding/binary"
"errors"
"math"
"net"
Expand Down Expand Up @@ -100,6 +100,7 @@ func (c *Client) handleUDPTunnel(buffer []byte) error {
if user == nil {
return errInvalidProtobuf
}

decoder := user.decoder
if decoder == nil {
// TODO: decoder pool
Expand All @@ -114,7 +115,7 @@ func (c *Client) handleUDPTunnel(buffer []byte) error {

// Sequence
// TODO: use in jitter buffer
_, n = varint.Decode(buffer)
seq, n := varint.Decode(buffer)
if n <= 0 {
return errInvalidProtobuf
}
Expand All @@ -126,57 +127,31 @@ func (c *Client) handleUDPTunnel(buffer []byte) error {
return errInvalidProtobuf
}
buffer = buffer[n:]
// Grab terminator bit
isLast := int(length)&0x2000 > 0
// Opus audio packets set the 13th bit in the size field as the terminator.
audioLength := int(length) &^ 0x2000
if audioLength > len(buffer) {
return errInvalidProtobuf
}

pcm, err := decoder.Decode(buffer[:audioLength], AudioMaximumFrameSize)
if err != nil {
return err
var opusData []byte
if audioLength > 0 {
opusData = make([]byte, audioLength)
copy(opusData, buffer[:audioLength])
}

event := AudioPacket{
Client: c,
Sender: user,
if err := user.buffer.AddPacket(&jbAudioPacket{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is user.buffer being initialised? I'm getting nil pointer dereference error here...

Sequence: seq,
Client: c,
Sender: user,
Target: &VoiceTarget{
ID: uint32(audioTarget),
},
AudioBuffer: AudioBuffer(pcm),
}

if len(buffer)-audioLength == 3*4 {
// the packet has positional audio data; 3x float32
buffer = buffer[audioLength:]

event.X = math.Float32frombits(binary.LittleEndian.Uint32(buffer))
event.Y = math.Float32frombits(binary.LittleEndian.Uint32(buffer[4:]))
event.Z = math.Float32frombits(binary.LittleEndian.Uint32(buffer[8:]))
event.HasPosition = true
}

c.volatileLock.Lock()
c.volatileWg.Wait()
for item := c.Config.AudioListeners.head; item != nil; item = item.next {
c.volatileLock.Unlock()
ch := item.streams[user]
if ch == nil {
ch = make(chan *AudioPacket)
item.streams[user] = ch
event := AudioStreamEvent{
Client: c,
User: user,
C: ch,
}
item.listener.OnAudioStream(&event)
}
ch <- &event
c.volatileLock.Lock()
c.volatileWg.Wait()
Opus: opusData,
IsLast: isLast,
Length: audioLength,
}); err != nil {
return err
}
c.volatileLock.Unlock()

return nil
}

Expand Down
216 changes: 216 additions & 0 deletions gumble/jitterbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package gumble

import (
"container/heap"
"encoding/binary"
_ "fmt"
_ "log"
"math"
_ "os"
"sync"
"time"
)

const (
// JitterStartDelay is the starting delay we will start our buffer with
jitterStartDelay = 100 * time.Millisecond
)

// jbAudioPacket holds pre-decoded audio samples
type jbAudioPacket struct {
Sequence int64
Client *Client
Sender *User
Target *VoiceTarget
Samples int
Opus []byte
Length int

HasPosition bool
X, Y, Z float32
IsLast bool
}

type jitterBufferHeap []*jbAudioPacket

func (h jitterBufferHeap) Len() int { return len(h) }
func (h jitterBufferHeap) Less(i, j int) bool { return h[i].Sequence < h[j].Sequence }
func (h jitterBufferHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *jitterBufferHeap) Push(x interface{}) {
*h = append(*h, x.(*jbAudioPacket))
}

func (h *jitterBufferHeap) Pop() interface{} {
old := *h
x := old[len(old)-1]
*h = old[:len(old)-1]
return x
}

// jitterBuffer struct holds all information to run the jitter buffer
// seq is the current sequence number of the audio packets we have sent to be played
//...
type jitterBuffer struct {
seq int64
delay *time.Timer
jitter time.Duration
heap jitterBufferHeap
bufferSamples int64
running bool
user *User
target *VoiceTarget
client *Client
HeapLock *sync.Mutex
RunningLock *sync.Mutex
}

// Creates a new jitterBuffer, starts the go routine to handle all packets
func newJitterBuffer() *jitterBuffer {
jb := &jitterBuffer{
running: false,
seq: -1,
jitter: jitterStartDelay,
HeapLock: &sync.Mutex{},
RunningLock: &sync.Mutex{},
}
//heap.Init(jb.heap)
//jb.process() ???
return jb
}

// AddPacket adds a packet to the jitter buffer
func (j *jitterBuffer) AddPacket(ap *jbAudioPacket) error {
samples, err := j.user.decoder.SampleSize(ap.Opus)
if err != nil {
//fmt.Fprintf(os.Stderr, "%s %v\n", err, ap.Opus)
return err
}
ap.Samples = samples
j.bufferSamples += int64(ap.Samples)
j.HeapLock.Lock()
heap.Push(&j.heap, ap)
j.HeapLock.Unlock()
if !j.running {
j.RunningLock.Lock()
j.running = true
j.RunningLock.Unlock()
if j.seq == -1 || len(j.heap) == 1 { // set our sequence to first received audio packet's sequence if buffer is empty (-1)
//println("Heap empty, or -1, sequence is now at ", ap.Sequence)
j.seq = ap.Sequence
}
j.client = ap.Client
go j.process()
}
return nil
}

// TODO Kill stream if no audio for X seconds, don't depend on is last
// Should only be called once internally, handles all packets.
func (j *jitterBuffer) process() {
time.Sleep(j.jitter)
var chans []chan *AudioPacket
j.client.volatileLock.Lock()
j.client.volatileWg.Wait()
for item := j.client.Config.AudioListeners.head; item != nil; item = item.next {
j.client.volatileLock.Unlock()
ch := make(chan *AudioPacket)
defer close(ch)
chans = append(chans, ch)
event := AudioStreamEvent{
Client: j.client,
User: j.user,
C: ch,
}
item.listener.OnAudioStream(&event)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit is broken. Creating the AudioPacket channel and calling OnAudioStream() should only happen once. The client has a goroutine running listening on the channel, so it doesn't make sense to be closing it every time the heap size goes to zero (which happens when nobody is speaking).

j.client.volatileLock.Lock()
j.client.volatileWg.Wait()
}
j.client.volatileLock.Unlock()
for {

// Handle jitter buffer/*|| float64((j.bufferSamples/(AudioSampleRate/1000))) < j.delayMs.Seconds()*1000*/
/*if len(j.heap) == 0 {
j.RunningLock.Lock()
j.running = false
j.RunningLock.Unlock()
break // TODO handle better (could run out w/ more audio on the way)
}*/
if len(j.heap) == 0 {
continue // TODO Not right? :\
}
j.HeapLock.Lock()
if j.target != j.heap[0].Target {
j.target = j.heap[0].Target
}
j.HeapLock.Unlock()
if j.heap[0].Sequence < j.seq { // Throw the packet out if we have passed it due to a delay
j.HeapLock.Lock()
_ = heap.Pop(&j.heap)
j.HeapLock.Unlock()
continue
}
var pcm []int16
var nextPacket *jbAudioPacket

j.HeapLock.Lock()
//println(j.seq, " ", j.heap[0].Sequence)
if j.seq+1 < j.heap[0].Sequence { // Send a null packet with the missing sequence, used for loss concealment
//println("Delayed packet, expected: ", j.seq+1, "got: ", j.heap[0].Sequence)
var err error
j.seq = j.heap[0].Sequence
pcm, err = j.user.decoder.Decode(nil, 30) // TODO length of silence
if err != nil {
//fmt.Fprintf(os.Stderr, "%s\n", err)
j.HeapLock.Unlock()
continue // skip!
}
} else {
var err error
nextPacket = heap.Pop(&j.heap).(*jbAudioPacket)
pcm, err = j.user.decoder.Decode(nextPacket.Opus[:nextPacket.Length], AudioMaximumFrameSize)
if err != nil {

//fmt.Fprintf(os.Stderr, "%s %v\n", err, nextPacket.Opus[:nextPacket.Length])
frames, _ := j.user.decoder.CountFrames(nextPacket.Opus)
j.seq = nextPacket.Sequence + int64(frames)
j.HeapLock.Unlock()
continue
}
}
j.HeapLock.Unlock()
if nextPacket != nil {
frames, _ := j.user.decoder.CountFrames(nextPacket.Opus)
j.seq = nextPacket.Sequence + int64(frames)
}

event := AudioPacket{
Client: j.client,
Sender: j.user,
Target: j.target,
AudioBuffer: AudioBuffer(pcm),
}

if nextPacket != nil && len(nextPacket.Opus)-nextPacket.Length == 3*4 {
// the packet has positional audio data; 3x float32
event.X = math.Float32frombits(binary.LittleEndian.Uint32(nextPacket.Opus))
event.Y = math.Float32frombits(binary.LittleEndian.Uint32(nextPacket.Opus[4:]))
event.Z = math.Float32frombits(binary.LittleEndian.Uint32(nextPacket.Opus[8:]))
event.HasPosition = true
}
for _, ch := range chans {
ch <- &event
}
if nextPacket != nil {
time.Sleep(time.Duration(nextPacket.Samples/(AudioSampleRate/1000)) * time.Millisecond)
} else {
time.Sleep(30 * time.Millisecond)
}
if (nextPacket != nil && nextPacket.IsLast) || (len(j.heap) == 0) { // we can wait for our next packets now
j.RunningLock.Lock()
j.running = false
j.RunningLock.Unlock()
break
}
}
}
5 changes: 4 additions & 1 deletion gumble/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ type User struct {
// The user's texture hash. nil if User.Texture has been populated.
TextureHash []byte

// The user's stats. Containts nil if the stats have not yet been requested.
// The user's stats. Contains nil if the stats have not yet been requested.
Stats *UserStats

// The user's jitter buffer, audio is held here for smoother playback.
buffer *jitterBuffer

client *Client
decoder AudioDecoder
}
Expand Down
2 changes: 2 additions & 0 deletions gumble/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ type Users map[uint32]*User
func (u Users) create(session uint32) *User {
user := &User{
Session: session,
buffer: newJitterBuffer(),
}
user.buffer.user = user
u[session] = user
return user
}
Expand Down
7 changes: 6 additions & 1 deletion opus/opus.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ func (*Decoder) ID() int {
func (d *Decoder) Decode(data []byte, frameSize int) ([]int16, error) {
return d.Decoder.Decode(data, frameSize, false)
}

func (d *Decoder) SampleSize(data []byte) (int, error) {
return gopus.GetSamplesPerFrame(data, gumble.AudioSampleRate)
}
func (d *Decoder) CountFrames(data []byte) (int, error) {
return gopus.CountFrames(data)
}
func (d *Decoder) Reset() {
d.Decoder.ResetState()
}