Skip to content

Commit

Permalink
Improve HLS handling and format for streaming server
Browse files Browse the repository at this point in the history
Enhanced HLS management in the streaming server codebase. Fixed discontinuity in HLS tag sequences, introduced new HLS sequence indexing, and updated media type in responses. Comments formatting were also improved for better code readability and understanding.
  • Loading branch information
deepch committed May 17, 2024
1 parent 9d6c9ec commit 830a83d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
9 changes: 5 additions & 4 deletions apiHTTPHLS.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/sirupsen/logrus"
)

//HTTPAPIServerStreamHLSM3U8 send client m3u8 play list
// HTTPAPIServerStreamHLSM3U8 send client m3u8 play list
func HTTPAPIServerStreamHLSM3U8(c *gin.Context) {
requestLogger := log.WithFields(logrus.Fields{
"module": "http_hls",
Expand All @@ -33,7 +33,7 @@ func HTTPAPIServerStreamHLSM3U8(c *gin.Context) {
return
}

c.Header("Content-Type", "application/x-mpegURL")
c.Header("Content-Type", "application/vnd.apple.mpegurl")
Storage.StreamChannelRun(c.Param("uuid"), c.Param("channel"))
//If stream mode on_demand need wait ready segment's
for i := 0; i < 40; i++ {
Expand All @@ -45,7 +45,7 @@ func HTTPAPIServerStreamHLSM3U8(c *gin.Context) {
}).Errorln(err.Error())
return
}
if seq >= 6 {
if seq >= 5 {
_, err := c.Writer.Write([]byte(index))
if err != nil {
c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()})
Expand All @@ -60,7 +60,7 @@ func HTTPAPIServerStreamHLSM3U8(c *gin.Context) {
}
}

//HTTPAPIServerStreamHLSTS send client ts segment
// HTTPAPIServerStreamHLSTS send client ts segment
func HTTPAPIServerStreamHLSTS(c *gin.Context) {
requestLogger := log.WithFields(logrus.Fields{
"module": "http_hls",
Expand All @@ -84,6 +84,7 @@ func HTTPAPIServerStreamHLSTS(c *gin.Context) {
}).Errorln(err.Error())
return
}
c.Header("Content-Type", "video/MP2T")
outfile := bytes.NewBuffer([]byte{})
Muxer := ts.NewMuxer(outfile)
Muxer.PaddingToMakeCounterCont = true
Expand Down
19 changes: 12 additions & 7 deletions storageStreamHLS.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,50 +8,54 @@ import (
"github.com/deepch/vdk/av"
)

//StreamHLSAdd add hls seq to buffer
// StreamHLSAdd add hls seq to buffer
func (obj *StorageST) StreamHLSAdd(uuid string, channelID string, val []*av.Packet, dur time.Duration) {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if tmp, ok := obj.Streams[uuid]; ok {
if channelTmp, ok := tmp.Channels[channelID]; ok {
channelTmp.hlsSegmentNumber++
channelTmp.hlsSegmentBuffer[channelTmp.hlsSegmentNumber] = SegmentOld{data: val, dur: dur}
channelTmp.hlsLastDur = int(dur.Seconds())
if len(channelTmp.hlsSegmentBuffer) >= 6 {
delete(channelTmp.hlsSegmentBuffer, channelTmp.hlsSegmentNumber-6-1)
delete(channelTmp.hlsSegmentBuffer, channelTmp.hlsSegmentNumber-5)
channelTmp.hlsSequence++
}
tmp.Channels[channelID] = channelTmp
obj.Streams[uuid] = tmp
}
}
}

//StreamHLSm3u8 get hls m3u8 list
// StreamHLSm3u8 get hls m3u8 list
func (obj *StorageST) StreamHLSm3u8(uuid string, channelID string) (string, int, error) {
obj.mutex.RLock()
defer obj.mutex.RUnlock()
if tmp, ok := obj.Streams[uuid]; ok {
if channelTmp, ok := tmp.Channels[channelID]; ok {
var out string
//TODO fix it
out += "#EXTM3U\r\n#EXT-X-TARGETDURATION:4\r\n#EXT-X-VERSION:4\r\n#EXT-X-MEDIA-SEQUENCE:" + strconv.Itoa(channelTmp.hlsSegmentNumber) + "\r\n"
out += "#EXTM3U\r\n#EXT-X-TARGETDURATION:" + strconv.Itoa(channelTmp.hlsLastDur) + "\r\n#EXT-X-VERSION:4\r\n#EXT-X-MEDIA-SEQUENCE:" + strconv.Itoa(channelTmp.hlsSequence) + "\r\n"
var keys []int
for k := range channelTmp.hlsSegmentBuffer {
keys = append(keys, k)
}
sort.Ints(keys)
var count int
for _, i := range keys {
if i == 2 {
out += "#EXT-X-DISCONTINUITY\r\n"
}
count++
out += "#EXTINF:" + strconv.FormatFloat(channelTmp.hlsSegmentBuffer[i].dur.Seconds(), 'f', 1, 64) + ",\r\nsegment/" + strconv.Itoa(i) + "/file.ts\r\n"

}
return out, count, nil
}
}
return "", 0, ErrorStreamNotFound
}

//StreamHLSTS send hls segment buffer to clients
// StreamHLSTS send hls segment buffer to clients
func (obj *StorageST) StreamHLSTS(uuid string, channelID string, seq int) ([]*av.Packet, error) {
obj.mutex.RLock()
defer obj.mutex.RUnlock()
Expand All @@ -65,14 +69,15 @@ func (obj *StorageST) StreamHLSTS(uuid string, channelID string, seq int) ([]*av
return nil, ErrorStreamNotFound
}

//StreamHLSFlush delete hls cache
// StreamHLSFlush delete hls cache
func (obj *StorageST) StreamHLSFlush(uuid string, channelID string) {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if tmp, ok := obj.Streams[uuid]; ok {
if channelTmp, ok := tmp.Channels[channelID]; ok {
channelTmp.hlsSegmentBuffer = make(map[int]SegmentOld)
channelTmp.hlsSegmentNumber = 0
channelTmp.hlsSequence = 0
tmp.Channels[channelID] = channelTmp
obj.Streams[uuid] = tmp
}
Expand Down
20 changes: 11 additions & 9 deletions storageStruct.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ import (

var Storage = NewStreamCore()

//Default stream type
// Default stream type
const (
MSE = iota
WEBRTC
RTSP
)

//Default stream status type
// Default stream status type
const (
OFFLINE = iota
ONLINE
)

//Default stream errors
// Default stream errors
var (
Success = "success"
ErrorStreamNotFound = errors.New("stream not found")
Expand All @@ -43,15 +43,15 @@ var (
ErrorStreamUnauthorized = errors.New("stream request unauthorized")
)

//StorageST main storage struct
// StorageST main storage struct
type StorageST struct {
mutex sync.RWMutex
Server ServerST `json:"server" groups:"api,config"`
Streams map[string]StreamST `json:"streams,omitempty" groups:"api,config"`
ChannelDefaults ChannelST `json:"channel_defaults,omitempty" groups:"api,config"`
}

//ServerST server storage section
// ServerST server storage section
type ServerST struct {
Debug bool `json:"debug" groups:"api,config"`
LogLevel logrus.Level `json:"log_level" groups:"api,config"`
Expand All @@ -76,13 +76,13 @@ type ServerST struct {
WebRTCPortMax uint16 `json:"webrtc_port_max" groups:"api,config"`
}

//Token auth
// Token auth
type Token struct {
Enable bool `json:"enable" groups:"api,config"`
Backend string `json:"backend" groups:"api,config"`
}

//ServerST stream storage section
// ServerST stream storage section
type StreamST struct {
Name string `json:"name,omitempty" groups:"api,config"`
Channels map[string]ChannelST `json:"channels,omitempty" groups:"api,config"`
Expand All @@ -102,12 +102,14 @@ type ChannelST struct {
signals chan int
hlsSegmentBuffer map[int]SegmentOld
hlsSegmentNumber int
hlsSequence int
hlsLastDur int
clients map[string]ClientST
ack time.Time
hlsMuxer *MuxerHLS `json:"-"`
}

//ClientST client storage section
// ClientST client storage section
type ClientST struct {
mode int
signals chan int
Expand All @@ -116,7 +118,7 @@ type ClientST struct {
socket net.Conn
}

//SegmentOld HLS cache section
// SegmentOld HLS cache section
type SegmentOld struct {
dur time.Duration
data []*av.Packet
Expand Down

0 comments on commit 830a83d

Please sign in to comment.