diff --git a/gumble/audio.go b/gumble/audio.go index dee4809..40872b9 100644 --- a/gumble/audio.go +++ b/gumble/audio.go @@ -79,7 +79,6 @@ type AudioPacket struct { Target *VoiceTarget AudioBuffer - HasPosition bool X, Y, Z float32 } diff --git a/gumble/audiocodec.go b/gumble/audiocodec.go index 2f2bd59..e1a791f 100644 --- a/gumble/audiocodec.go +++ b/gumble/audiocodec.go @@ -51,5 +51,7 @@ type AudioEncoder interface { type AudioDecoder interface { ID() int Decode(data []byte, frameSize int) ([]int16, error) + SampleSize(data []byte) (int, error) + CountFrames(data []byte) (int, error) Reset() } diff --git a/gumble/audiolisteners.go b/gumble/audiolisteners.go index 83c528a..929eab5 100644 --- a/gumble/audiolisteners.go +++ b/gumble/audiolisteners.go @@ -4,7 +4,7 @@ type audioEventItem struct { parent *AudioListeners prev, next *audioEventItem listener AudioListener - streams map[*User]chan *AudioPacket + //streams map[*User]chan *AudioPacket } func (e *audioEventItem) Detach() { @@ -32,7 +32,7 @@ func (e *AudioListeners) Attach(listener AudioListener) Detacher { parent: e, prev: e.tail, listener: listener, - streams: make(map[*User]chan *AudioPacket), + //streams: make(map[*User]chan *AudioPacket), } if e.head == nil { e.head = item diff --git a/gumble/handlers.go b/gumble/handlers.go index c3884f0..663c2b8 100644 --- a/gumble/handlers.go +++ b/gumble/handlers.go @@ -2,7 +2,7 @@ package gumble // import "layeh.com/gumble/gumble" import ( "crypto/x509" - "encoding/binary" + // "encoding/binary" "errors" "math" "net" @@ -100,6 +100,7 @@ func (c *Client) handleUDPTunnel(buffer []byte) error { if user == nil { return errInvalidProtobuf } + decoder := user.decoder if decoder == nil { // TODO: decoder pool @@ -114,7 +115,7 @@ func (c *Client) handleUDPTunnel(buffer []byte) error { // Sequence // TODO: use in jitter buffer - _, n = varint.Decode(buffer) + seq, n := varint.Decode(buffer) if n <= 0 { return errInvalidProtobuf } @@ -126,57 +127,31 @@ func (c *Client) handleUDPTunnel(buffer []byte) error { return errInvalidProtobuf } buffer = buffer[n:] + // Grab terminator bit + isLast := int(length)&0x2000 > 0 // Opus audio packets set the 13th bit in the size field as the terminator. audioLength := int(length) &^ 0x2000 if audioLength > len(buffer) { return errInvalidProtobuf } - - pcm, err := decoder.Decode(buffer[:audioLength], AudioMaximumFrameSize) - if err != nil { - return err + var opusData []byte + if audioLength > 0 { + opusData = make([]byte, audioLength) + copy(opusData, buffer[:audioLength]) } - - event := AudioPacket{ - Client: c, - Sender: user, + if err := user.buffer.AddPacket(&jbAudioPacket{ + Sequence: seq, + Client: c, + Sender: user, Target: &VoiceTarget{ ID: uint32(audioTarget), }, - AudioBuffer: AudioBuffer(pcm), - } - - if len(buffer)-audioLength == 3*4 { - // the packet has positional audio data; 3x float32 - buffer = buffer[audioLength:] - - event.X = math.Float32frombits(binary.LittleEndian.Uint32(buffer)) - event.Y = math.Float32frombits(binary.LittleEndian.Uint32(buffer[4:])) - event.Z = math.Float32frombits(binary.LittleEndian.Uint32(buffer[8:])) - event.HasPosition = true - } - - c.volatileLock.Lock() - c.volatileWg.Wait() - for item := c.Config.AudioListeners.head; item != nil; item = item.next { - c.volatileLock.Unlock() - ch := item.streams[user] - if ch == nil { - ch = make(chan *AudioPacket) - item.streams[user] = ch - event := AudioStreamEvent{ - Client: c, - User: user, - C: ch, - } - item.listener.OnAudioStream(&event) - } - ch <- &event - c.volatileLock.Lock() - c.volatileWg.Wait() + Opus: opusData, + IsLast: isLast, + Length: audioLength, + }); err != nil { + return err } - c.volatileLock.Unlock() - return nil } diff --git a/gumble/jitterbuffer.go b/gumble/jitterbuffer.go new file mode 100644 index 0000000..c8c3128 --- /dev/null +++ b/gumble/jitterbuffer.go @@ -0,0 +1,202 @@ +package gumble + +import ( + "container/heap" + "encoding/binary" + _ "fmt" + _ "log" + "math" + _ "os" + "sync" + "time" +) + +const ( + // JitterStartDelay is the starting delay we will start our buffer with + jitterStartDelay = 200 * time.Millisecond +) + +// jbAudioPacket holds pre-decoded audio samples +type jbAudioPacket struct { + Sequence int64 + Client *Client + Sender *User + Target *VoiceTarget + Samples int + Opus []byte + Length int + + HasPosition bool + X, Y, Z float32 + IsLast bool +} + +type jitterBufferHeap []*jbAudioPacket + +func (h jitterBufferHeap) Len() int { return len(h) } +func (h jitterBufferHeap) Less(i, j int) bool { return h[i].Sequence < h[j].Sequence } +func (h jitterBufferHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *jitterBufferHeap) Push(x interface{}) { + *h = append(*h, x.(*jbAudioPacket)) +} + +func (h *jitterBufferHeap) Pop() interface{} { + old := *h + x := old[len(old)-1] + *h = old[:len(old)-1] + return x +} + +// jitterBuffer struct holds all information to run the jitter buffer +// seq is the current sequence number of the audio packets we have sent to be played +type jitterBuffer struct { + seq int64 + jitter time.Duration + heap jitterBufferHeap + bufferSamples int64 + running bool + user *User + target *VoiceTarget + client *Client + HeapLock *sync.Mutex + RunningLock *sync.Mutex +} + +// Creates a new jitterBuffer +func newJitterBuffer() *jitterBuffer { + jb := &jitterBuffer{ + running: false, + seq: -1, + jitter: jitterStartDelay, + HeapLock: &sync.Mutex{}, + RunningLock: &sync.Mutex{}, + } + return jb +} + +// AddPacket adds a packet to the jitter buffer +func (j *jitterBuffer) AddPacket(ap *jbAudioPacket) error { + samples, err := j.user.decoder.SampleSize(ap.Opus) + if err != nil { + return err + } + ap.Samples = samples + j.bufferSamples += int64(ap.Samples) + j.HeapLock.Lock() + heap.Push(&j.heap, ap) + j.HeapLock.Unlock() + if !j.running { + j.RunningLock.Lock() + j.running = true + j.RunningLock.Unlock() + if j.seq == -1 || len(j.heap) == 1 { // set our sequence to first received audio packet's sequence if buffer is empty (-1) + //println("Heap empty, or -1, sequence is now at ", ap.Sequence) + j.seq = ap.Sequence + } + j.client = ap.Client + go j.process() + } + return nil +} + +// TODO don't depend on is last +func (j *jitterBuffer) process() { + time.Sleep(j.jitter) + var chans []chan *AudioPacket + j.client.volatileLock.Lock() + j.client.volatileWg.Wait() + for item := j.client.Config.AudioListeners.head; item != nil; item = item.next { + j.client.volatileLock.Unlock() + ch := make(chan *AudioPacket) + defer close(ch) + chans = append(chans, ch) + event := AudioStreamEvent{ + Client: j.client, + User: j.user, + C: ch, + } + item.listener.OnAudioStream(&event) + j.client.volatileLock.Lock() + j.client.volatileWg.Wait() + } + j.client.volatileLock.Unlock() + for { + if len(j.heap) == 0 { + continue + } + j.HeapLock.Lock() + if j.target != j.heap[0].Target { + j.target = j.heap[0].Target + } + j.HeapLock.Unlock() + if j.heap[0].Sequence < j.seq { // Throw the packet out if we have passed it due to a delay + j.HeapLock.Lock() + _ = heap.Pop(&j.heap) + j.HeapLock.Unlock() + continue + } + var pcm []int16 + var nextPacket *jbAudioPacket + + j.HeapLock.Lock() + //println(j.seq, " ", j.heap[0].Sequence) + if j.seq+1 < j.heap[0].Sequence { // Send a null packet with the missing sequence, used for loss concealment + //println("Delayed packet, expected: ", j.seq+1, "got: ", j.heap[0].Sequence) + var err error + j.seq = j.heap[0].Sequence + pcm, err = j.user.decoder.Decode(nil, 30) // TODO length of silence + if err != nil { + //fmt.Fprintf(os.Stderr, "%s\n", err) + j.HeapLock.Unlock() + continue // skip! + } + } else { + var err error + nextPacket = heap.Pop(&j.heap).(*jbAudioPacket) + pcm, err = j.user.decoder.Decode(nextPacket.Opus[:nextPacket.Length], AudioMaximumFrameSize) + if err != nil { + + //fmt.Fprintf(os.Stderr, "%s %v\n", err, nextPacket.Opus[:nextPacket.Length]) + frames, _ := j.user.decoder.CountFrames(nextPacket.Opus) + j.seq = nextPacket.Sequence + int64(frames) + j.HeapLock.Unlock() + continue + } + } + j.HeapLock.Unlock() + if nextPacket != nil { + frames, _ := j.user.decoder.CountFrames(nextPacket.Opus) + j.seq = nextPacket.Sequence + int64(frames) + } + + event := AudioPacket{ + Client: j.client, + Sender: j.user, + Target: j.target, + AudioBuffer: AudioBuffer(pcm), + } + + if nextPacket != nil && len(nextPacket.Opus)-nextPacket.Length == 3*4 { + // the packet has positional audio data; 3x float32 + event.X = math.Float32frombits(binary.LittleEndian.Uint32(nextPacket.Opus)) + event.Y = math.Float32frombits(binary.LittleEndian.Uint32(nextPacket.Opus[4:])) + event.Z = math.Float32frombits(binary.LittleEndian.Uint32(nextPacket.Opus[8:])) + event.HasPosition = true + } + for _, ch := range chans { + ch <- &event + } + if nextPacket != nil { + time.Sleep(time.Duration(nextPacket.Samples/(AudioSampleRate/1000)) * time.Millisecond) + } else { + time.Sleep(30 * time.Millisecond) + } + if (nextPacket != nil && nextPacket.IsLast) || (len(j.heap) == 0) { // we can wait for our next packets now + j.RunningLock.Lock() + j.running = false + j.RunningLock.Unlock() + break + } + } +} diff --git a/gumble/user.go b/gumble/user.go index 289d510..78f51c6 100644 --- a/gumble/user.go +++ b/gumble/user.go @@ -44,9 +44,12 @@ type User struct { // The user's texture hash. nil if User.Texture has been populated. TextureHash []byte - // The user's stats. Containts nil if the stats have not yet been requested. + // The user's stats. Contains nil if the stats have not yet been requested. Stats *UserStats + // The user's jitter buffer, audio is held here for smoother playback. + buffer *jitterBuffer + client *Client decoder AudioDecoder } diff --git a/gumble/users.go b/gumble/users.go index a8ed79c..525e044 100644 --- a/gumble/users.go +++ b/gumble/users.go @@ -13,7 +13,9 @@ type Users map[uint32]*User func (u Users) create(session uint32) *User { user := &User{ Session: session, + buffer: newJitterBuffer(), } + user.buffer.user = user u[session] = user return user } diff --git a/opus/opus.go b/opus/opus.go index ce1e018..30a96db 100644 --- a/opus/opus.go +++ b/opus/opus.go @@ -69,7 +69,12 @@ func (*Decoder) ID() int { func (d *Decoder) Decode(data []byte, frameSize int) ([]int16, error) { return d.Decoder.Decode(data, frameSize, false) } - +func (d *Decoder) SampleSize(data []byte) (int, error) { + return gopus.GetSamplesPerFrame(data, gumble.AudioSampleRate) +} +func (d *Decoder) CountFrames(data []byte) (int, error) { + return gopus.CountFrames(data) +} func (d *Decoder) Reset() { d.Decoder.ResetState() }