From 47a63ce2390ebc1fd19b53b6ba7037997272d04f Mon Sep 17 00:00:00 2001 From: emranemran Date: Wed, 29 Nov 2023 14:53:54 -0800 Subject: [PATCH 1/6] wip: channel works --- transcode/transcode.go | 52 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index d187b63a..49b34827 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -53,6 +53,11 @@ type TranscodeSegmentRequest struct { IsClip bool } +type TranscodedSegmentInfo struct { + RenditionName string + SegmentIndex int +} + func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) { log.AddContext(transcodeRequest.RequestID, "source_manifest", transcodeRequest.SourceManifestURL, "stream_name", streamName) log.Log(transcodeRequest.RequestID, "RunTranscodeProcess (v2) Beginning") @@ -140,6 +145,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st }) } else { for _, profile := range transcodeProfiles { + renditionList.AddRenditionSegment(profile.Name, &video.TSegmentList{ SegmentDataTable: make(map[int][]byte), @@ -148,9 +154,18 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } } + // Create a buffered channel for segment information + segmentChannel := make(chan TranscodedSegmentInfo, 10) // Buffer size of 10 + + var wg sync.WaitGroup + var jobs *ParallelTranscoding jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { - err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster) + err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel) + + wg.Add(1) // Increment the WaitGroup counter + defer wg.Done() // Decrement the counter when the function exits + segmentsCount++ if err != nil { return err @@ -162,12 +177,40 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } return nil }) + + // Start the disk-writing goroutine + go func() { + var segmentBatch []TranscodedSegmentInfo + + for segInfo := range segmentChannel { + segmentBatch = append(segmentBatch, segInfo) + if len(segmentBatch) >= 1 { + //writeSegmentsToDisk(segmentBatch) + fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex) + //access and delete via mutexes + segmentBatch = nil + } + } + // Handle any remaining segments after the channel is closed + if len(segmentBatch) > 0 { + //writeSegmentsToDisk(segmentBatch) + fmt.Println("XXX: writing to disk here after channel close") + } + }() + jobs.Start() if err = jobs.Wait(); err != nil { // return first error to caller return outputs, segmentsCount, err } + // Wait for all transcoding jobs to complete + wg.Wait() + // Close the channel to signal that no more segments will be sent + close(segmentChannel) + + fmt.Println("XXX: DONE") + // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) if err != nil { @@ -432,6 +475,7 @@ func transcodeSegment( transcodedStats []*video.RenditionStats, renditionList *video.TRenditionList, broadcaster clients.BroadcasterClient, + segmentChannel chan<- TranscodedSegmentInfo, ) error { start := time.Now() @@ -516,6 +560,12 @@ func transcodeSegment( // be generated i.e. all profiles for mp4 inputs and only highest quality // rendition for hls inputs like recordings. segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData) + + segmentChannel <- TranscodedSegmentInfo{ + RenditionName: transcodedSegment.Name, // Use actual rendition name + SegmentIndex: segment.Index, // Use actual segment index + } + } } From 1643d85f946d4a6fbb8b181581ecb68953bf32a9 Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 30 Nov 2023 15:08:38 -0800 Subject: [PATCH 2/6] write to file works --- transcode/transcode.go | 70 +++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 49b34827..92edc31d 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -54,6 +55,7 @@ type TranscodeSegmentRequest struct { } type TranscodedSegmentInfo struct { + RequestID string RenditionName string SegmentIndex int } @@ -178,25 +180,35 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return nil }) - // Start the disk-writing goroutine - go func() { - var segmentBatch []TranscodedSegmentInfo - - for segInfo := range segmentChannel { - segmentBatch = append(segmentBatch, segInfo) - if len(segmentBatch) >= 1 { - //writeSegmentsToDisk(segmentBatch) - fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex) - //access and delete via mutexes - segmentBatch = nil - } - } - // Handle any remaining segments after the channel is closed - if len(segmentBatch) > 0 { - //writeSegmentsToDisk(segmentBatch) - fmt.Println("XXX: writing to disk here after channel close") + if transcodeRequest.GenerateMP4 { + // Create folder to hold transmux-ed files in local storage temporarily + TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") + if err != nil && !os.IsExist(err) { + log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) + return outputs, segmentsCount, err } - }() + defer os.RemoveAll(TransmuxStorageDir) + + // Start the disk-writing goroutine + go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) { + var segmentBatch []TranscodedSegmentInfo + + for segInfo := range segmentChannel { + segmentBatch = append(segmentBatch, segInfo) + if len(segmentBatch) >= 1 { + writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) + fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex) + //access and delete via mutexes + segmentBatch = nil + } + } + // Handle any remaining segments after the channel is closed + if len(segmentBatch) > 0 { + writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) + fmt.Println("XXX: writing to disk here after channel close") + } + }(TransmuxStorageDir, &renditionList) + } jobs.Start() if err = jobs.Wait(); err != nil { @@ -210,6 +222,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st close(segmentChannel) fmt.Println("XXX: DONE") +return outputs, segmentsCount, err // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) @@ -402,6 +415,26 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, nil } +func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRenditionList, segmentBatch []TranscodedSegmentInfo) (int64, error) { + for _, segInfo := range segmentBatch { + + segmentList := renditionList.GetSegmentList(segInfo.RenditionName) + segmentData := segmentList.GetSegment(segInfo.SegmentIndex) + segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts") + segmentFile, err := os.Create(segmentFilename) + if err != nil { + return 0, fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err) + } + defer segmentFile.Close() + _, err = segmentFile.Write(segmentData) + if err != nil { + return 0, fmt.Errorf("error writing segment err: %w", err) + } + + } + return 0, nil +} + func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) { var mp4OutputsPre []video.OutputVideoFile // e. Upload all mp4 related output files @@ -562,6 +595,7 @@ func transcodeSegment( segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData) segmentChannel <- TranscodedSegmentInfo{ + RequestID: transcodeRequest.RequestID, RenditionName: transcodedSegment.Name, // Use actual rendition name SegmentIndex: segment.Index, // Use actual segment index } From 8ade76d1a1768ed8896ec0560e1defbb4d6165b1 Mon Sep 17 00:00:00 2001 From: emranemran Date: Sun, 3 Dec 2023 17:07:02 -0800 Subject: [PATCH 3/6] transmux/concat fixxes --- transcode/transcode.go | 20 +++++++++----------- video/transmux.go | 11 +++++++++-- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 92edc31d..94ce653b 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -23,7 +23,6 @@ import ( const ( UploadTimeout = 5 * time.Minute - TransmuxStorageDir = "/tmp/transmux_stage" ) type TranscodeSegmentRequest struct { @@ -180,9 +179,11 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return nil }) + var TransmuxStorageDir string if transcodeRequest.GenerateMP4 { + var err error // Create folder to hold transmux-ed files in local storage temporarily - TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") + TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_" + transcodeRequest.RequestID + "_") if err != nil && !os.IsExist(err) { log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) return outputs, segmentsCount, err @@ -221,9 +222,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st // Close the channel to signal that no more segments will be sent close(segmentChannel) - fmt.Println("XXX: DONE") -return outputs, segmentsCount, err - // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) if err != nil { @@ -250,13 +248,13 @@ return outputs, segmentsCount, err var concatFiles []string for rendition, segments := range renditionList.RenditionSegmentTable { // Create folder to hold transmux-ed files in local storage temporarily - TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") +/* TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") if err != nil && !os.IsExist(err) { log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) return outputs, segmentsCount, err } defer os.RemoveAll(TransmuxStorageDir) - +*/ // Create a single .ts file for a given rendition by concatenating all segments in order if rendition == "low-bitrate" { // skip mp4 generation for low-bitrate profile @@ -269,11 +267,11 @@ return outputs, segmentsCount, err // For now, use the stream based concat for clipping only and file based concat for everything else. // Eventually, all mp4 generation can be moved to stream based concat once proven effective. var totalBytes int64 - if transcodeRequest.IsClip { +// if transcodeRequest.IsClip { totalBytes, err = video.ConcatTS(concatTsFileName, segments, true) - } else { - totalBytes, err = video.ConcatTS(concatTsFileName, segments, false) - } +// } else { +// totalBytes, err = video.ConcatTS(concatTsFileName, segments, false) +// } if err != nil { log.Log(transcodeRequest.RequestID, "error concatenating .ts", "file", concatTsFileName, "err", err) continue diff --git a/video/transmux.go b/video/transmux.go index 84125f8c..3f19518c 100644 --- a/video/transmux.go +++ b/video/transmux.go @@ -138,10 +138,10 @@ func ConcatTS(tsFileName string, segmentsList *TSegmentList, useStreamBasedConca }() // Write each segment to disk and add segment filename to the text file - for segName, segData := range segmentsList.GetSortedSegments() { + for segName, _ := range segmentsList.GetSortedSegments() { // Open a new file to write each segment to disk segmentFilename := fileBaseWithoutExt + "_" + strconv.Itoa(segName) + ".ts" - segmentFile, err := os.Create(segmentFilename) +/* segmentFile, err := os.Create(segmentFilename) if err != nil { return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err) } @@ -151,6 +151,13 @@ func ConcatTS(tsFileName string, segmentsList *TSegmentList, useStreamBasedConca if err != nil { return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err) } +*/ + fileInfo, err := os.Stat(segmentFilename) + if err != nil { + return totalBytes, fmt.Errorf("error stat segment %d err: %w", segName, err) + } + segBytes := fileInfo.Size() + segmentFilenames = append(segmentFilenames, segmentFilename) totalBytes = totalBytes + int64(segBytes) // Add filename to the text file From 3d88e44903656033e0d68d643f03d7b8bfb8d59a Mon Sep 17 00:00:00 2001 From: emranemran Date: Sun, 3 Dec 2023 18:22:31 -0800 Subject: [PATCH 4/6] works --- transcode/transcode.go | 30 +++++++++++++++--------------- video/transmux.go | 22 +++++++++++----------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 94ce653b..24f5d886 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -23,6 +23,7 @@ import ( const ( UploadTimeout = 5 * time.Minute + SegmentChannelSize = 10 ) type TranscodeSegmentRequest struct { @@ -146,7 +147,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st }) } else { for _, profile := range transcodeProfiles { - renditionList.AddRenditionSegment(profile.Name, &video.TSegmentList{ SegmentDataTable: make(map[int][]byte), @@ -156,7 +156,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } // Create a buffered channel for segment information - segmentChannel := make(chan TranscodedSegmentInfo, 10) // Buffer size of 10 + segmentChannel := make(chan TranscodedSegmentInfo, SegmentChannelSize) var wg sync.WaitGroup @@ -183,7 +183,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st if transcodeRequest.GenerateMP4 { var err error // Create folder to hold transmux-ed files in local storage temporarily - TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_" + transcodeRequest.RequestID + "_") + TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_") if err != nil && !os.IsExist(err) { log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) return outputs, segmentsCount, err @@ -248,13 +248,13 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st var concatFiles []string for rendition, segments := range renditionList.RenditionSegmentTable { // Create folder to hold transmux-ed files in local storage temporarily -/* TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") - if err != nil && !os.IsExist(err) { - log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) - return outputs, segmentsCount, err - } - defer os.RemoveAll(TransmuxStorageDir) -*/ + /* TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") + if err != nil && !os.IsExist(err) { + log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) + return outputs, segmentsCount, err + } + defer os.RemoveAll(TransmuxStorageDir) + */ // Create a single .ts file for a given rendition by concatenating all segments in order if rendition == "low-bitrate" { // skip mp4 generation for low-bitrate profile @@ -267,11 +267,11 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st // For now, use the stream based concat for clipping only and file based concat for everything else. // Eventually, all mp4 generation can be moved to stream based concat once proven effective. var totalBytes int64 -// if transcodeRequest.IsClip { - totalBytes, err = video.ConcatTS(concatTsFileName, segments, true) -// } else { -// totalBytes, err = video.ConcatTS(concatTsFileName, segments, false) -// } + // if transcodeRequest.IsClip { + totalBytes, err = video.ConcatTS(concatTsFileName, segments, true) + // } else { + // totalBytes, err = video.ConcatTS(concatTsFileName, segments, false) + // } if err != nil { log.Log(transcodeRequest.RequestID, "error concatenating .ts", "file", concatTsFileName, "err", err) continue diff --git a/video/transmux.go b/video/transmux.go index 3f19518c..48c863b1 100644 --- a/video/transmux.go +++ b/video/transmux.go @@ -141,17 +141,17 @@ func ConcatTS(tsFileName string, segmentsList *TSegmentList, useStreamBasedConca for segName, _ := range segmentsList.GetSortedSegments() { // Open a new file to write each segment to disk segmentFilename := fileBaseWithoutExt + "_" + strconv.Itoa(segName) + ".ts" -/* segmentFile, err := os.Create(segmentFilename) - if err != nil { - return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err) - } - defer segmentFile.Close() - // Write the segment data to disk - segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData]) - if err != nil { - return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err) - } -*/ + /* segmentFile, err := os.Create(segmentFilename) + if err != nil { + return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err) + } + defer segmentFile.Close() + // Write the segment data to disk + segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData]) + if err != nil { + return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err) + } + */ fileInfo, err := os.Stat(segmentFilename) if err != nil { return totalBytes, fmt.Errorf("error stat segment %d err: %w", segName, err) From 6d1731ce08e4988576930b0754db9b2e85c43db0 Mon Sep 17 00:00:00 2001 From: emranemran Date: Sun, 3 Dec 2023 19:41:40 -0800 Subject: [PATCH 5/6] fixes-works --- transcode/transcode.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index 24f5d886..f3191b6c 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -164,8 +164,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel) - wg.Add(1) // Increment the WaitGroup counter - defer wg.Done() // Decrement the counter when the function exits segmentsCount++ if err != nil { @@ -191,12 +189,14 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st defer os.RemoveAll(TransmuxStorageDir) // Start the disk-writing goroutine + wg.Add(1) // Increment the WaitGroup counter go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) { var segmentBatch []TranscodedSegmentInfo + defer wg.Done() for segInfo := range segmentChannel { segmentBatch = append(segmentBatch, segInfo) - if len(segmentBatch) >= 1 { + if len(segmentBatch) >= 5 { writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex) //access and delete via mutexes @@ -217,10 +217,9 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, err } - // Wait for all transcoding jobs to complete - wg.Wait() // Close the channel to signal that no more segments will be sent close(segmentChannel) + wg.Wait() // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) From ef485b04cbb0fba71e9f32601122e022f44a26e3 Mon Sep 17 00:00:00 2001 From: emranemran Date: Sun, 3 Dec 2023 20:11:27 -0800 Subject: [PATCH 6/6] fix --- transcode/transcode.go | 39 +++++++++++++++++---------------------- video/media.go | 6 ++++++ 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/transcode/transcode.go b/transcode/transcode.go index f3191b6c..ba67f27c 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -155,16 +155,16 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } } - // Create a buffered channel for segment information + // Create a buffered channel where transcoded segments are sent to be written to disk segmentChannel := make(chan TranscodedSegmentInfo, SegmentChannelSize) + // Create a waitgroup to synchronize when the disk writing goroutine finishes var wg sync.WaitGroup + // Setup parallel transcode sessions var jobs *ParallelTranscoding jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel) - - segmentsCount++ if err != nil { return err @@ -188,18 +188,19 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } defer os.RemoveAll(TransmuxStorageDir) - // Start the disk-writing goroutine - wg.Add(1) // Increment the WaitGroup counter + // Start the disk-writing (consumer) goroutine + wg.Add(1) go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) { var segmentBatch []TranscodedSegmentInfo defer wg.Done() + // Keep checking for new segments in the buffered channel for segInfo := range segmentChannel { segmentBatch = append(segmentBatch, segInfo) - if len(segmentBatch) >= 5 { + // Begin writing to disk if at-least 50% of buffered channel is full + if len(segmentBatch) >= SegmentChannelSize/2 { writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex) - //access and delete via mutexes segmentBatch = nil } } @@ -211,14 +212,17 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st }(TransmuxStorageDir, &renditionList) } + // Start the transcoding (producer) goroutines jobs.Start() if err = jobs.Wait(); err != nil { // return first error to caller return outputs, segmentsCount, err } - // Close the channel to signal that no more segments will be sent + // If the disk-writing gorouine was started, then close the segment channel to + // signal that no more segments will be sent. This will be a no-op if MP4s are not requested. close(segmentChannel) + // Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested. wg.Wait() // Build the manifests and push them to storage @@ -246,14 +250,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st var concatFiles []string for rendition, segments := range renditionList.RenditionSegmentTable { - // Create folder to hold transmux-ed files in local storage temporarily - /* TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") - if err != nil && !os.IsExist(err) { - log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) - return outputs, segmentsCount, err - } - defer os.RemoveAll(TransmuxStorageDir) - */ // Create a single .ts file for a given rendition by concatenating all segments in order if rendition == "low-bitrate" { // skip mp4 generation for low-bitrate profile @@ -263,14 +259,8 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st concatFiles = append(concatFiles, concatTsFileName) defer os.Remove(concatTsFileName) - // For now, use the stream based concat for clipping only and file based concat for everything else. - // Eventually, all mp4 generation can be moved to stream based concat once proven effective. var totalBytes int64 - // if transcodeRequest.IsClip { totalBytes, err = video.ConcatTS(concatTsFileName, segments, true) - // } else { - // totalBytes, err = video.ConcatTS(concatTsFileName, segments, false) - // } if err != nil { log.Log(transcodeRequest.RequestID, "error concatenating .ts", "file", concatTsFileName, "err", err) continue @@ -415,6 +405,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRenditionList, segmentBatch []TranscodedSegmentInfo) (int64, error) { for _, segInfo := range segmentBatch { + // All accesses to renditionList and segmentList is protected by a mutex behind the scenes segmentList := renditionList.GetSegmentList(segInfo.RenditionName) segmentData := segmentList.GetSegment(segInfo.SegmentIndex) segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts") @@ -427,6 +418,8 @@ func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRendi if err != nil { return 0, fmt.Errorf("error writing segment err: %w", err) } + // "Delete" buffered segment data from memory in hopes the garbage-collector releases it + segmentList.RemoveSegmentData(segInfo.SegmentIndex) } return 0, nil @@ -591,6 +584,8 @@ func transcodeSegment( // rendition for hls inputs like recordings. segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData) + // send this transcoded segment to the segment channel so that it can be written + // to disk in parallel segmentChannel <- TranscodedSegmentInfo{ RequestID: transcodeRequest.RequestID, RenditionName: transcodedSegment.Name, // Use actual rendition name diff --git a/video/media.go b/video/media.go index 71d1c7c4..1eba7173 100644 --- a/video/media.go +++ b/video/media.go @@ -39,6 +39,12 @@ func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) { s.mu.Unlock() } +func (s *TSegmentList) RemoveSegmentData(segIdx int) { + s.mu.Lock() + s.SegmentDataTable[segIdx] = []byte{} + s.mu.Unlock() +} + func (s *TSegmentList) GetSegment(segIdx int) []byte { s.mu.Lock() defer s.mu.Unlock()