From c166810a302f37b14ac82a49c714cbf5c82b3e98 Mon Sep 17 00:00:00 2001 From: Eike David Lenz Date: Thu, 7 Mar 2024 11:57:01 +0100 Subject: [PATCH] improve log stream channel handling (#72) * handle closed log streaming channels * improve channel handling * fix tail and follow behaviour * move defer * use separate waitGroups * rename to previous * move func --- .gitignore | 1 + pkg/proc/basejob.go | 63 ++++++++++++++++++++++++++++----------- pkg/proc/job_common.go | 2 +- pkg/proc/runner_api_v1.go | 42 ++++++++++++++++---------- pkg/proc/types.go | 2 ++ 5 files changed, 76 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index bb073a0..57aa8e8 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /mittnitectl /dist examples/mittnite.d/local.hcl +cmd/mittnitectl/mittnitectl diff --git a/pkg/proc/basejob.go b/pkg/proc/basejob.go index 2347f24..8105f63 100644 --- a/pkg/proc/basejob.go +++ b/pkg/proc/basejob.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "path" + "sync" "syscall" "time" @@ -85,20 +86,20 @@ func (job *baseJob) StreamStdOut(ctx context.Context, outChan chan []byte, errCh if len(job.Config.Stdout) == 0 { return } - job.readStdFile(ctx, job.Config.Stdout, outChan, errChan, follow, tailLen) + job.readStdFile(ctx, &job.stdOutWg, job.Config.Stdout, outChan, errChan, follow, tailLen) } func (job *baseJob) StreamStdErr(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) { if len(job.Config.Stderr) == 0 { return } - job.readStdFile(ctx, job.Config.Stderr, outChan, errChan, follow, tailLen) + job.readStdFile(ctx, &job.stdErrWg, job.Config.Stderr, outChan, errChan, follow, tailLen) } -func (job *baseJob) StreamStdOutAndStdErr(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) { - job.StreamStdOut(ctx, outChan, errChan, follow, tailLen) +func (job *baseJob) StreamStdOutAndStdErr(ctx context.Context, outChan chan []byte, stdOutErrChan, stdErrErrChan chan error, follow bool, tailLen int) { + job.StreamStdOut(ctx, outChan, stdOutErrChan, follow, tailLen) if job.Config.Stdout != job.Config.Stderr { - job.StreamStdErr(ctx, outChan, errChan, follow, tailLen) + job.StreamStdErr(ctx, outChan, stdErrErrChan, follow, tailLen) } } @@ -196,27 +197,43 @@ func (job *baseJob) closeStdFiles() { } } -func (job *baseJob) readStdFile(ctx context.Context, filePath string, outChan chan []byte, errChan chan error, follow bool, tailLen int) { +func (job *baseJob) readStdFile(ctx context.Context, wg *sync.WaitGroup, filePath string, outChan chan []byte, errChan chan error, follow bool, tailLen int) { stdFile, err := os.OpenFile(filePath, os.O_RDONLY, 0o666) if err != nil { errChan <- err return } + defer stdFile.Close() - seekTail(tailLen, stdFile, outChan) + + seekTail(ctx, wg, tailLen, stdFile, outChan) read := func() { scanner := bufio.NewScanner(stdFile) + for scanner.Scan() { - line := scanner.Bytes() - outChan <- line + select { + case <-ctx.Done(): + return + case outChan <- scanner.Bytes(): + default: + if follow { + continue + } + return + } } + if err := scanner.Err(); err != nil { - errChan <- err - return + select { + case <-ctx.Done(): + return + case errChan <- err: + default: + return + } } } - for { select { default: @@ -225,13 +242,18 @@ func (job *baseJob) readStdFile(ctx context.Context, filePath string, outChan ch errChan <- io.EOF return } + + continue case <-ctx.Done(): return } } } -func seekTail(lines int, stdFile *os.File, outChan chan []byte) { +func seekTail(ctx context.Context, wg *sync.WaitGroup, lines int, stdFile *os.File, outChan chan []byte) { + wg.Add(1) + defer wg.Done() + if lines < 0 { return } @@ -251,12 +273,17 @@ func seekTail(lines int, stdFile *os.File, outChan chan []byte) { tailBuffer.PushBack(line) } for tailBuffer.Len() > 0 { - item := tailBuffer.Front() - line, ok := item.Value.([]byte) - if ok { - outChan <- line + select { + case <-ctx.Done(): + return + default: + item := tailBuffer.Front() + line, ok := item.Value.([]byte) + if ok { + outChan <- line + } + tailBuffer.Remove(item) } - tailBuffer.Remove(item) } } diff --git a/pkg/proc/job_common.go b/pkg/proc/job_common.go index e38ea2e..e3c376a 100644 --- a/pkg/proc/job_common.go +++ b/pkg/proc/job_common.go @@ -145,7 +145,7 @@ func (job *CommonJob) Watch() { for p := range job.watchingFiles { _, err := os.Stat(p) if os.IsNotExist(err) { - log.Infof("file %s changed, signalling process %s", p, job.Config.Name) + log.Infof("file %s not found, signalling process %s", p, job.Config.Name) delete(job.watchingFiles, p) signal = true } diff --git a/pkg/proc/runner_api_v1.go b/pkg/proc/runner_api_v1.go index 1609b48..e093670 100644 --- a/pkg/proc/runner_api_v1.go +++ b/pkg/proc/runner_api_v1.go @@ -12,6 +12,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" ) @@ -150,11 +151,13 @@ func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) { streamCtx, cancel := context.WithCancel(context.Background()) outChan := make(chan []byte) - errChan := make(chan error) + stdOutErrChan := make(chan error) + stdErrErrChan := make(chan error) defer func() { cancel() close(outChan) - close(errChan) + close(stdOutErrChan) + close(stdErrErrChan) }() // handle client disconnects @@ -170,18 +173,12 @@ func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) { tailLen = -1 } - go job.StreamStdOutAndStdErr(streamCtx, outChan, errChan, follow, tailLen) + go job.StreamStdOutAndStdErr(streamCtx, outChan, stdOutErrChan, stdErrErrChan, follow, tailLen) - for { - select { - case logLine := <-outChan: - if err := conn.WriteMessage(websocket.TextMessage, logLine); err != nil { - break - } - - case err = <-errChan: - if errors.Is(err, io.EOF) { - err = conn.WriteControl( + handleErr := func(err error, wg *sync.WaitGroup) { + if errors.Is(err, io.EOF) { + if !follow { + err := conn.WriteControl( websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "EOF"), time.Now().Add(time.Second), @@ -190,9 +187,24 @@ func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) { return } } + return + } else { log.WithField("job.name", job.Config.Name). - Error(fmt.Sprintf("error during logs streaming: %s", err.Error())) - break + Error(fmt.Sprintf("error while streaming logs from stdout: %s", err.Error())) + } + } + + for { + select { + case logLine := <-outChan: + if err := conn.WriteMessage(websocket.TextMessage, logLine); err != nil { + break + } + + case err := <-stdOutErrChan: + handleErr(err, &job.stdOutWg) + case err := <-stdErrErrChan: + handleErr(err, &job.stdErrWg) case <-streamCtx.Done(): return diff --git a/pkg/proc/types.go b/pkg/proc/types.go index 5dea34d..05efc11 100644 --- a/pkg/proc/types.go +++ b/pkg/proc/types.go @@ -52,6 +52,8 @@ type baseJob struct { ctx context.Context interrupt context.CancelFunc + stdErrWg sync.WaitGroup + stdOutWg sync.WaitGroup cmd *exec.Cmd restart bool