From 5cc45737c5a29a5ee0d7b70991f1c11175cf4bfb Mon Sep 17 00:00:00 2001 From: Andrey Semochkin Date: Sat, 15 Aug 2020 23:16:59 +0300 Subject: [PATCH] fix all war and style --- main.go => RTSPtoWeb.go | 0 apiHTTPHLS.go | 57 +++++++++++++++++++----------------- apiHTTPMSE.go | 37 +++++++++++++++-------- apiHTTPRouter.go | 40 +++++++++---------------- apiHTTPServer.go | 2 ++ apiHTTPStream.go | 65 ++++++++++++++++++++++++++++++----------- apiHTTPWebRTC.go | 34 ++++++++++++--------- config.json | 15 ++++++++-- loggingLog.go | 1 + storageClient.go | 5 ++-- storageConfig.go | 15 +++++----- storageServer.go | 9 +++--- storageStream.go | 47 ++++++++++++++++++----------- storageStreamHLS.go | 10 +++++-- storageStruct.go | 18 ++++++++++-- streamCore.go | 29 +++++++++--------- 16 files changed, 233 insertions(+), 151 deletions(-) rename main.go => RTSPtoWeb.go (100%) diff --git a/main.go b/RTSPtoWeb.go similarity index 100% rename from main.go rename to RTSPtoWeb.go diff --git a/apiHTTPHLS.go b/apiHTTPHLS.go index b668d32..8481d75 100644 --- a/apiHTTPHLS.go +++ b/apiHTTPHLS.go @@ -2,34 +2,34 @@ package main import ( "bytes" - "log" "time" "github.com/deepch/vdk/format/ts" "github.com/gin-gonic/gin" ) -//ready //HTTPAPIServerStreamHLSTS send client m3u8 play list func HTTPAPIServerStreamHLSM3U8(c *gin.Context) { - uuid := c.Param("uuid") - if !Storage.StreamExist(uuid) { - c.IndentedJSON(500, "Stream Not Found") + if !Storage.StreamExist(c.Param("uuid")) { + c.IndentedJSON(500, Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) return } c.Header("Content-Type", "application/x-mpegURL") - Storage.StreamRun(uuid) + Storage.StreamRun(c.Param("uuid")) //If stream mode on_demand need wait ready segment's for i := 0; i < 40; i++ { - index, seq, err := Storage.StreamHLSm3u8(uuid) + index, seq, err := Storage.StreamHLSm3u8(c.Param("uuid")) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } if seq >= 6 { _, err := c.Writer.Write([]byte(index)) if err != nil { - c.IndentedJSON(400, err.Error()) + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } return @@ -38,53 +38,58 @@ func HTTPAPIServerStreamHLSM3U8(c *gin.Context) { } } -//ready //HTTPAPIServerStreamHLSTS send client ts segment func HTTPAPIServerStreamHLSTS(c *gin.Context) { - uuid := c.Param("uuid") - //Check Has Stream - if !Storage.StreamExist(uuid) { - log.Println("Not Found Error") + if !Storage.StreamExist(c.Param("uuid")) { + c.IndentedJSON(500, Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) return } - outfile := bytes.NewBuffer([]byte{}) - codecs, err := Storage.StreamCodecs(uuid) + codecs, err := Storage.StreamCodecs(c.Param("uuid")) if err != nil { - c.IndentedJSON(500, err.Error()) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } + outfile := bytes.NewBuffer([]byte{}) Muxer := ts.NewMuxer(outfile) Muxer.PaddingToMakeCounterCont = true err = Muxer.WriteHeader(codecs) if err != nil { - c.IndentedJSON(500, err.Error()) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - data, err := Storage.StreamHLSTS(uuid, stringToInt(c.Param("seq"))) + seqData, err := Storage.StreamHLSTS(c.Param("uuid"), stringToInt(c.Param("seq"))) if err != nil { - c.IndentedJSON(500, err.Error()) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - if len(data) == 0 { - c.IndentedJSON(500, "No Segment Found") + if len(seqData) == 0 { + c.IndentedJSON(500, Message{Status: 0, Payload: ErrorStreamNotHLSSegments.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: ErrorStreamNotHLSSegments.Error()}) return } - for _, v := range data { + for _, v := range seqData { v.CompositionTime = 1 err = Muxer.WritePacket(*v) if err != nil { - c.IndentedJSON(500, err.Error()) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } } err = Muxer.WriteTrailer() if err != nil { - c.IndentedJSON(500, err.Error()) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } _, err = c.Writer.Write(outfile.Bytes()) if err != nil { - c.IndentedJSON(400, err.Error()) + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } diff --git a/apiHTTPMSE.go b/apiHTTPMSE.go index 48711e6..89b31f7 100644 --- a/apiHTTPMSE.go +++ b/apiHTTPMSE.go @@ -8,50 +8,59 @@ import ( ) func HTTPAPIServerStreamMSE(ws *websocket.Conn) { - defer ws.Close() - uuid := ws.Request().FormValue("uuid") - if !Storage.StreamExist(uuid) { + defer func() { + err := ws.Close() + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err}) + }() + if !Storage.StreamExist(ws.Request().FormValue("uuid")) { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) return } err := ws.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } - cid, ch, err := Storage.ClientAdd(uuid) + cid, ch, err := Storage.ClientAdd(ws.Request().FormValue("uuid")) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } - defer Storage.ClientDelete(uuid, cid) - Storage.StreamRun(uuid) - codecs, err := Storage.StreamCodecs(uuid) + defer Storage.ClientDelete(ws.Request().FormValue("uuid"), cid) + Storage.StreamRun(ws.Request().FormValue("uuid")) + codecs, err := Storage.StreamCodecs(ws.Request().FormValue("uuid")) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } - muxerMSE := mp4f.NewMuxer(nil) err = muxerMSE.WriteHeader(codecs) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } meta, init := muxerMSE.GetInit(codecs) err = websocket.Message.Send(ws, append([]byte{9}, meta...)) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } err = websocket.Message.Send(ws, init) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } var videoStart bool go func() { + defer func() { + err := ws.Close() + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err}) + }() for { var message string err := websocket.Message.Receive(ws, &message) if err != nil { - err = ws.Close() - if err != nil { - return - } + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } } @@ -60,6 +69,7 @@ func HTTPAPIServerStreamMSE(ws *websocket.Conn) { for { select { case <-noVideo.C: + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: ErrorStreamNoVideo.Error()}) return case pck := <-ch: if pck.IsKeyFrame { @@ -71,15 +81,18 @@ func HTTPAPIServerStreamMSE(ws *websocket.Conn) { } ready, buf, err := muxerMSE.WritePacket(*pck, false) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } if ready { err := ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } err = websocket.Message.Send(ws, buf) if err != nil { + loggingPrintln(ws.Request().FormValue("uuid"), Message{Status: 0, Payload: err.Error()}) return } } diff --git a/apiHTTPRouter.go b/apiHTTPRouter.go index 4155dae..96a7f25 100644 --- a/apiHTTPRouter.go +++ b/apiHTTPRouter.go @@ -1,15 +1,21 @@ package main import ( - "encoding/json" - "log" "net/http" + "os" "time" "github.com/gin-gonic/gin" "golang.org/x/net/websocket" ) +//Message resp struct +type Message struct { + Status int `json:"status"` + Payload interface{} `json:"payload"` +} + +//HTTPAPIServer start http server routes func HTTPAPIServer() { //Set HTTP API mode var public *gin.Engine @@ -38,6 +44,7 @@ func HTTPAPIServer() { privat.GET("/stream/:uuid/delete", HTTPAPIServerStreamDelete) privat.GET("/stream/:uuid/reload", HTTPAPIServerStreamReload) privat.GET("/stream/:uuid/info", HTTPAPIServerStreamInfo) + privat.GET("/stream/:uuid/codec", HTTPAPIServerStreamCodec) /* Stream video elements */ @@ -48,25 +55,6 @@ func HTTPAPIServer() { handler.ServeHTTP(c.Writer, c.Request) }) public.POST("/stream/:uuid/webrtc", HTTPAPIServerStreamWebRTC) - //TODO Fix It - public.GET("/codec/:uuid", func(c *gin.Context) { - c.Header("Access-Control-Allow-Origin", "*") - if Storage.StreamExist(c.Param("uuid")) { - codecs, _ := Storage.StreamCodecs(c.Param("uuid")) - if codecs == nil { - return - } - b, err := json.Marshal(codecs) - log.Println(string(b), err) - if err == nil { - _, err = c.Writer.Write(b) - if err == nil { - log.Println("Write Codec Info error", err) - return - } - } - } - }) /* Static HTML Files Demo Mode */ @@ -75,23 +63,21 @@ func HTTPAPIServer() { } err := public.Run(Storage.ServerHTTPPort()) if err != nil { - log.Fatalln(err) + loggingPrintln(Message{Status: 0, Payload: err.Error()}) + os.Exit(1) } } + +//HTTPAPIServerIndex index file func HTTPAPIServerIndex(c *gin.Context) { - //fi, all := Storage.List() - //sort.Strings(all) c.HTML(http.StatusOK, "index.tmpl", gin.H{ "port": Storage.ServerHTTPPort(), "streams": Storage.Streams, - // "uuid": fi, - // "uuidMap": all, "version": time.Now().String(), }) } -//ready //CrossOrigin Access-Control-Allow-Origin any methods func CrossOrigin() gin.HandlerFunc { return func(c *gin.Context) { diff --git a/apiHTTPServer.go b/apiHTTPServer.go index 06ab7d0..62b9c5b 100644 --- a/apiHTTPServer.go +++ b/apiHTTPServer.go @@ -1 +1,3 @@ package main + +//TODO add to next version diff --git a/apiHTTPStream.go b/apiHTTPStream.go index 07798cc..b6d4ba2 100644 --- a/apiHTTPStream.go +++ b/apiHTTPStream.go @@ -1,64 +1,95 @@ package main -import "github.com/gin-gonic/gin" +import ( + "github.com/gin-gonic/gin" +) +//HTTPAPIServerStreams function return stream list func HTTPAPIServerStreams(c *gin.Context) { - c.IndentedJSON(200, Storage.List()) + c.IndentedJSON(200, Message{Status: 1, Payload: Storage.List()}) } +//HTTPAPIServerStreamAdd function add new stream func HTTPAPIServerStreamAdd(c *gin.Context) { var payload StreamST err := c.BindJSON(&payload) if err != nil { - c.IndentedJSON(400, err) + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } err = Storage.StreamAdd(c.Param("uuid"), payload) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - c.IndentedJSON(200, "ok") + c.IndentedJSON(200, Message{Status: 1, Payload: Success}) } + +//HTTPAPIServerStreamEdit function edit stream func HTTPAPIServerStreamEdit(c *gin.Context) { - if !Storage.StreamExist(c.Param("uuid")) { - return - } var payload StreamST err := c.BindJSON(&payload) if err != nil { - c.IndentedJSON(400, err) + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } err = Storage.StreamEdit(c.Param("uuid"), payload) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - c.IndentedJSON(200, "ok") + c.IndentedJSON(200, Message{Status: 1, Payload: Success}) } + +//HTTPAPIServerStreamDelete function delete stream func HTTPAPIServerStreamDelete(c *gin.Context) { err := Storage.StreamDelete(c.Param("uuid")) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - c.IndentedJSON(200, "ok") + c.IndentedJSON(200, Message{Status: 1, Payload: Success}) } + +//HTTPAPIServerStreamDelete function reload stream func HTTPAPIServerStreamReload(c *gin.Context) { err := Storage.StreamReload(c.Param("uuid")) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - c.IndentedJSON(200, "ok") + c.IndentedJSON(200, Message{Status: 1, Payload: Success}) } +//HTTPAPIServerStreamInfo function return stream info struct func HTTPAPIServerStreamInfo(c *gin.Context) { info, err := Storage.StreamInfo(c.Param("uuid")) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) + return + } + c.IndentedJSON(200, Message{Status: 1, Payload: info}) +} + +//HTTPAPIServerStreamCodec function return codec info struct +func HTTPAPIServerStreamCodec(c *gin.Context) { + if !Storage.StreamExist(c.Param("uuid")) { + c.IndentedJSON(500, Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) + return + } + codecs, err := Storage.StreamCodecs(c.Param("uuid")) + if err != nil { + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - c.IndentedJSON(200, info) + c.IndentedJSON(200, Message{Status: 1, Payload: codecs}) } diff --git a/apiHTTPWebRTC.go b/apiHTTPWebRTC.go index 0f57114..5bf3ead 100644 --- a/apiHTTPWebRTC.go +++ b/apiHTTPWebRTC.go @@ -7,42 +7,48 @@ import ( "github.com/gin-gonic/gin" ) -//HTTPAPIServerStreamWebRTC need work +//HTTPAPIServerStreamWebRTC stream video over WebRTC func HTTPAPIServerStreamWebRTC(c *gin.Context) { - uuid := c.Param("uuid") - data := c.PostForm("data") - if !Storage.StreamExist(uuid) { - c.IndentedJSON(500, ErrorNotFound) + if !Storage.StreamExist(c.Param("uuid")) { + c.IndentedJSON(500, Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: ErrorStreamNotFound.Error()}) return } - Storage.StreamRun(uuid) - codecs, err := Storage.StreamCodecs(uuid) + Storage.StreamRun(c.Param("uuid")) + codecs, err := Storage.StreamCodecs(c.Param("uuid")) if err != nil { - c.IndentedJSON(500, err) + c.IndentedJSON(500, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } muxerWebRTC := webrtc.NewMuxer() - answer, err := muxerWebRTC.WriteHeader(codecs, data) + answer, err := muxerWebRTC.WriteHeader(codecs, c.PostForm("data")) if err != nil { - c.IndentedJSON(400, err) + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } _, err = c.Writer.Write([]byte(answer)) if err != nil { - c.IndentedJSON(400, err) + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } go func() { - cid, ch, err := Storage.ClientAdd(uuid) + cid, ch, err := Storage.ClientAdd(c.Param("uuid")) if err != nil { + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } - defer Storage.ClientDelete(uuid, cid) + defer Storage.ClientDelete(c.Param("uuid"), cid) var videoStart bool noVideo := time.NewTimer(10 * time.Second) for { select { case <-noVideo.C: + c.IndentedJSON(500, Message{Status: 0, Payload: ErrorStreamNoVideo.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: ErrorStreamNoVideo.Error()}) return case pck := <-ch: if pck.IsKeyFrame { @@ -54,6 +60,8 @@ func HTTPAPIServerStreamWebRTC(c *gin.Context) { } err = muxerWebRTC.WritePacket(*pck) if err != nil { + c.IndentedJSON(400, Message{Status: 0, Payload: err.Error()}) + loggingPrintln(c.Param("uuid"), Message{Status: 0, Payload: err.Error()}) return } } diff --git a/config.json b/config.json index 273582c..644f258 100644 --- a/config.json +++ b/config.json @@ -8,17 +8,26 @@ "http_port": ":8083" }, "streams": { + "346ee479-cdfa-45f8-b719-85253b549d72": { + "name": "Проверка", + "url": "rtsp://admin:123456@10.128.18.211/mpeg4", + "on_demand": false, + "debug": false, + "status": 0 + }, "demo1": { - "url": "rtsp://admin:admin@10.128.18.205/mpeg4", + "name": "Проверка 2 ", + "url": "rtsp://admin:123456@10.128.18.211/mpeg4", "on_demand": true, "debug": false, "status": 0 }, "demo2": { - "url": "rtsp://admin:admin@10.128.18.247:554/mpeg4", + "name": "Проверка 3", + "url": "rtsp://admin:123456@10.128.18.211/mpeg4", "on_demand": true, "debug": false, - "status": 1 + "status": 0 } } } \ No newline at end of file diff --git a/loggingLog.go b/loggingLog.go index 6f96b0e..2acace6 100644 --- a/loggingLog.go +++ b/loggingLog.go @@ -2,6 +2,7 @@ package main import "log" +//loggingPrintln logging function func loggingPrintln(v ...interface{}) { if debug { log.Println(v...) diff --git a/storageClient.go b/storageClient.go index 4bb6d1d..11e97b2 100644 --- a/storageClient.go +++ b/storageClient.go @@ -6,14 +6,13 @@ import ( "github.com/deepch/vdk/av" ) -//ready //ClientAdd Add New Client to Translations func (obj *StorageST) ClientAdd(uuid string) (string, chan *av.Packet, error) { obj.mutex.Lock() defer obj.mutex.Unlock() tmp, ok := obj.Streams[uuid] if !ok { - return "", nil, ErrorNotFound + return "", nil, ErrorStreamNotFound } //Generate UUID client cid, err := generateUUID() @@ -28,7 +27,6 @@ func (obj *StorageST) ClientAdd(uuid string) (string, chan *av.Packet, error) { } -//ready //ClientDelete Delete Client func (obj *StorageST) ClientDelete(uuid string, cid string) { obj.mutex.Lock() @@ -38,6 +36,7 @@ func (obj *StorageST) ClientDelete(uuid string, cid string) { } } +//ClientHas check is client ext func (obj *StorageST) ClientHas(uuid string) bool { obj.mutex.Lock() defer obj.mutex.Unlock() diff --git a/storageConfig.go b/storageConfig.go index fc0404a..935341d 100644 --- a/storageConfig.go +++ b/storageConfig.go @@ -4,12 +4,14 @@ import ( "encoding/json" "flag" "io/ioutil" - "log" + "os" "time" ) +//debug global var debug bool +//NewStreamCore do load config file func NewStreamCore() *StorageST { argConfigPatch := flag.String("config", "config.json", "config patch (/etc/server/config.json or config.json)") argDebug := flag.Bool("debug", true, "set debug mode") @@ -18,11 +20,13 @@ func NewStreamCore() *StorageST { var tmp StorageST data, err := ioutil.ReadFile(*argConfigPatch) if err != nil { - log.Fatalln(err) + loggingPrintln("Server config read error", err) + os.Exit(1) } err = json.Unmarshal(data, &tmp) if err != nil { - log.Fatalln(err) + loggingPrintln("Server config decode error", err) + os.Exit(1) } debug = tmp.Server.Debug for i, i2 := range tmp.Streams { @@ -34,11 +38,6 @@ func NewStreamCore() *StorageST { return &tmp } -/* - Client Sections -*/ - -//ready //ClientDelete Delete Client func (obj *StorageST) SaveConfig() error { res, err := json.MarshalIndent(obj, "", " ") diff --git a/storageServer.go b/storageServer.go index 12448ee..9386e2e 100644 --- a/storageServer.go +++ b/storageServer.go @@ -1,33 +1,34 @@ package main -/* - Server Sections -*/ - +//ServerHTTPDebug read debug options func (obj *StorageST) ServerHTTPDebug() bool { obj.mutex.RLock() defer obj.mutex.RUnlock() return obj.Server.HTTPDebug } +//ServerHTTPDebug read demo options func (obj *StorageST) ServerHTTPDemo() bool { obj.mutex.RLock() defer obj.mutex.RUnlock() return obj.Server.HTTPDemo } +//ServerHTTPDebug read Login options func (obj *StorageST) ServerHTTPLogin() string { obj.mutex.RLock() defer obj.mutex.RUnlock() return obj.Server.HTTPLogin } +//ServerHTTPDebug read Password options func (obj *StorageST) ServerHTTPPassword() string { obj.mutex.RLock() defer obj.mutex.RUnlock() return obj.Server.HTTPPassword } +//ServerHTTPDebug read HTTP Port options func (obj *StorageST) ServerHTTPPort() string { obj.mutex.RLock() defer obj.mutex.RUnlock() diff --git a/storageStream.go b/storageStream.go index d195092..30e80a0 100644 --- a/storageStream.go +++ b/storageStream.go @@ -6,10 +6,7 @@ import ( "github.com/deepch/vdk/av" ) -/* - Stream Sections -*/ - +//StreamExist check stream exist func (obj *StorageST) StreamExist(key string) bool { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -21,6 +18,7 @@ func (obj *StorageST) StreamExist(key string) bool { return false } +//StreamRunAll run all stream go func (obj *StorageST) StreamRunAll() { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -33,6 +31,7 @@ func (obj *StorageST) StreamRunAll() { } } +//StreamRun one stream and lock func (obj *StorageST) StreamRun(key string) { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -45,6 +44,7 @@ func (obj *StorageST) StreamRun(key string) { } } +//StreamUnlock unlock status to no lock func (obj *StorageST) StreamUnlock(key string) { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -54,15 +54,17 @@ func (obj *StorageST) StreamUnlock(key string) { } } +//StreamControl get stream func (obj *StorageST) StreamControl(key string) (*StreamST, error) { obj.mutex.Lock() defer obj.mutex.Unlock() if tmp, ok := obj.Streams[key]; ok { return &tmp, nil } - return nil, ErrorNotFound + return nil, ErrorStreamNotFound } +//List list all stream func (obj *StorageST) List() map[string]StreamST { obj.mutex.RLock() defer obj.mutex.RUnlock() @@ -73,12 +75,16 @@ func (obj *StorageST) List() map[string]StreamST { return tmp } +//StreamAdd add stream func (obj *StorageST) StreamAdd(uuid string, val StreamST) error { obj.mutex.Lock() defer obj.mutex.Unlock() if _, ok := obj.Streams[uuid]; ok { - return ErrorFound + return ErrorStreamAlreadyExists } + val.clients = make(map[string]ClientST) + val.ack = time.Now().Add(-255 * time.Hour) + val.hlsSegmentBuffer = make(map[int]Segment) obj.Streams[uuid] = val err := obj.SaveConfig() if err != nil { @@ -87,6 +93,7 @@ func (obj *StorageST) StreamAdd(uuid string, val StreamST) error { return nil } +//StreamAdd edit stream func (obj *StorageST) StreamEdit(uuid string, val StreamST) error { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -101,9 +108,10 @@ func (obj *StorageST) StreamEdit(uuid string, val StreamST) error { } return nil } - return ErrorNotFound + return ErrorStreamNotFound } +//StreamReload reload stream func (obj *StorageST) StreamReload(uuid string) error { obj.mutex.RLock() defer obj.mutex.RUnlock() @@ -113,9 +121,10 @@ func (obj *StorageST) StreamReload(uuid string) error { } return nil } - return ErrorNotFound + return ErrorStreamNotFound } +//StreamDelete stream func (obj *StorageST) StreamDelete(uuid string) error { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -130,18 +139,20 @@ func (obj *StorageST) StreamDelete(uuid string) error { } return nil } - return ErrorNotFound + return ErrorStreamNotFound } +//StreamInfo return stream info func (obj *StorageST) StreamInfo(uuid string) (*StreamST, error) { obj.mutex.RLock() defer obj.mutex.RUnlock() if tmp, ok := obj.Streams[uuid]; ok { return &tmp, nil } - return nil, ErrorNotFound + return nil, ErrorStreamNotFound } +//StreamCodecsUpdate update stream codec storage func (obj *StorageST) StreamCodecsUpdate(key string, val []av.CodecData) { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -151,36 +162,39 @@ func (obj *StorageST) StreamCodecsUpdate(key string, val []av.CodecData) { } } +//StreamCodecs get stream codec storage or wait func (obj *StorageST) StreamCodecs(key string) ([]av.CodecData, error) { for i := 0; i < 100; i++ { obj.mutex.RLock() tmp, ok := obj.Streams[key] obj.mutex.RUnlock() if !ok { - return nil, ErrorNotFound + return nil, ErrorStreamNotFound } if tmp.codecs != nil { return tmp.codecs, nil } time.Sleep(50 * time.Millisecond) } - return nil, ErrorCodecNotFound + return nil, ErrorStreamNotFound } -//ready //Cast broadcast stream func (obj *StorageST) Cast(key string, val *av.Packet) { obj.mutex.Lock() defer obj.mutex.Unlock() if tmp, ok := obj.Streams[key]; ok { if len(tmp.clients) > 0 { - for _, i2 := range tmp.clients { + for ic, i2 := range tmp.clients { if len(i2.outgoingPacket) < 1000 { i2.outgoingPacket <- val - } else { + } else if len(i2.signals) < 10 { //send stop signals to client i2.signals <- SignalStreamStop - i2.socket.Close() + err := i2.socket.Close() + if err != nil { + loggingPrintln(ic, "close client error", err) + } } } tmp.ack = time.Now() @@ -189,7 +203,6 @@ func (obj *StorageST) Cast(key string, val *av.Packet) { } } -//ready //StreamStatus change stream status func (obj *StorageST) StreamStatus(key string, val int) { obj.mutex.Lock() diff --git a/storageStreamHLS.go b/storageStreamHLS.go index 21c75f7..b07f3b8 100644 --- a/storageStreamHLS.go +++ b/storageStreamHLS.go @@ -8,6 +8,7 @@ import ( "github.com/deepch/vdk/av" ) +//StreamHLSAdd add hls seq to buffer func (obj *StorageST) StreamHLSAdd(uuid string, val []*av.Packet, dur time.Duration) { obj.mutex.Lock() defer obj.mutex.Unlock() @@ -20,11 +21,14 @@ func (obj *StorageST) StreamHLSAdd(uuid string, val []*av.Packet, dur time.Durat obj.Streams[uuid] = tmp } } + +//StreamHLSm3u8 get hls m3u8 list func (obj *StorageST) StreamHLSm3u8(uuid string) (string, int, error) { obj.mutex.RLock() defer obj.mutex.RUnlock() if tmp, ok := obj.Streams[uuid]; ok { var out string + //TODO fix it out += "#EXTM3U\r\n#EXT-X-TARGETDURATION:4\r\n#EXT-X-VERSION:4\r\n#EXT-X-MEDIA-SEQUENCE:" + strconv.Itoa(tmp.hlsSegmentNumber) + "\r\n" var keys []int for k := range tmp.hlsSegmentBuffer { @@ -39,10 +43,9 @@ func (obj *StorageST) StreamHLSm3u8(uuid string) (string, int, error) { } return out, count, nil } - return "", 0, ErrorNotFound + return "", 0, ErrorStreamNotFound } -//ready //StreamHLSTS send hls segment buffer to clients func (obj *StorageST) StreamHLSTS(key string, seq int) ([]*av.Packet, error) { obj.mutex.RLock() @@ -50,9 +53,10 @@ func (obj *StorageST) StreamHLSTS(key string, seq int) ([]*av.Packet, error) { if tmp, ok := obj.Streams[key].hlsSegmentBuffer[seq]; ok { return tmp.data, nil } - return nil, ErrorNotFound + return nil, ErrorStreamNotFound } +//StreamHLSFlush delete hls cache func (obj *StorageST) StreamHLSFlush(uuid string) { obj.mutex.Lock() defer obj.mutex.Unlock() diff --git a/storageStruct.go b/storageStruct.go index fddf0d0..bab1690 100644 --- a/storageStruct.go +++ b/storageStruct.go @@ -17,18 +17,27 @@ const ( ONLINE ) +//Default stream errors var ( - ErrorNotFound = errors.New("stream not found") - ErrorFound = errors.New("stream already exists") - ErrorCodecNotFound = errors.New("stream codec data not found") + Success = "success" + ErrorStreamNotFound = errors.New("stream not found") + ErrorStreamAlreadyExists = errors.New("stream already exists") + ErrorStreamNotHLSSegments = errors.New("stream hls not ts seq found") + ErrorStreamNoVideo = errors.New("stream no video") + ErrorStreamNoClients = errors.New("stream no clients") + ErrorStreamRestart = errors.New("stream restart") + ErrorStreamStopCoreSignal = errors.New("stream stop core signal") + ErrorStreamStopRTSPSignal = errors.New("stream stop rtsp signal") ) +//StorageST main storage struct type StorageST struct { mutex sync.RWMutex Server ServerST `json:"server"` Streams map[string]StreamST `json:"streams"` } +//ServerST server storage section type ServerST struct { Debug bool `json:"debug"` HTTPDemo bool `json:"http_demo"` @@ -38,6 +47,7 @@ type ServerST struct { HTTPPort string `json:"http_port"` } +//ServerST stream storage section type StreamST struct { Name string `json:"name"` URL string `json:"url"` @@ -53,6 +63,7 @@ type StreamST struct { ack time.Time } +//ClientST client storage section type ClientST struct { mode int signals chan int @@ -60,6 +71,7 @@ type ClientST struct { socket net.Conn } +//Segment HLS cache section type Segment struct { dur time.Duration data []*av.Packet diff --git a/streamCore.go b/streamCore.go index 3670474..8f975d3 100644 --- a/streamCore.go +++ b/streamCore.go @@ -1,7 +1,6 @@ package main import ( - "errors" "time" "github.com/deepch/vdk/av" @@ -19,25 +18,27 @@ func StreamServerRunStreamDo(name string) { return } if err != nil { - loggingPrintln("Stream Error", err) + loggingPrintln("Stream Error", err, "Restart Stream") } time.Sleep(2 * time.Second) } } + +//StreamServerRunStream core stream func StreamServerRunStream(name string) (bool, error) { keyTest := time.NewTimer(20 * time.Second) checkClients := time.NewTimer(20 * time.Second) Control, err := Storage.StreamControl(name) if err != nil { //TODO fix it - return true, ErrorNotFound + return true, ErrorStreamNotFound } var preKeyTS = time.Duration(0) var Seq []*av.Packet RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: Control.URL, DisableAudio: true, DialTimeout: 3 * time.Second, ReadWriteTimeout: time.Second * 5 * time.Second, Debug: Control.Debug}) if err != nil { - return false, errors.New("RTSP Client Error " + err.Error()) + return false, err } Storage.StreamStatus(name, ONLINE) defer func() { @@ -45,43 +46,41 @@ func StreamServerRunStream(name string) (bool, error) { Storage.StreamStatus(name, OFFLINE) Storage.StreamHLSFlush(name) }() - - //if codec data recived if len(RTSPClient.CodecData) > 0 { Storage.StreamCodecsUpdate(name, RTSPClient.CodecData) } loggingPrintln("Stream", name, "success connection RTSP") for { select { - //Read no video timeout + //Check stream have clients case <-checkClients.C: if Control.OnDemand && !Storage.ClientHas(name) { - return true, errors.New("not clients on steam") + return true, ErrorStreamNoClients } checkClients.Reset(20 * time.Second) + //Check stream send key case <-keyTest.C: - return false, errors.New("Video Stream No Send Key Frame") + return false, ErrorStreamNoVideo //Read core signals case signals := <-Control.signals: switch signals { case SignalStreamStop: - return true, errors.New("Core Stop Signal") + return true, ErrorStreamStopCoreSignal case SignalStreamRestart: - return false, errors.New("Core Restart Signal") + return false, ErrorStreamRestart case SignalStreamClient: - loggingPrintln("New Viwer Signal") + return true, ErrorStreamNoClients } //Read rtsp signals case signals := <-RTSPClient.Signals: switch signals { case rtspv2.SignalCodecUpdate: - loggingPrintln("Update Code Info") Storage.StreamCodecsUpdate(name, RTSPClient.CodecData) case rtspv2.SignalStreamRTPStop: - return false, errors.New("RTSP Client Restart Signal") + return false, ErrorStreamStopRTSPSignal } case <-RTSPClient.OutgoingProxy: - //Add Raw Proxy Next Version + //TODO Add Raw Proxy Next Version case packet := <-RTSPClient.OutgoingPacket: if packet.IsKeyFrame { keyTest.Reset(20 * time.Second)