diff --git a/cmd.go b/cmd.go index ac989b3..0b13bf4 100644 --- a/cmd.go +++ b/cmd.go @@ -454,7 +454,9 @@ func (c *Cmd) run(in io.Reader) { if c.stdoutStream != nil { defer func() { c.stdoutStream.Flush() + c.stdoutStream.Stop() c.stderrStream.Flush() + c.stderrStream.Stop() // exec.Cmd.Wait has already waited for all output: // Otherwise, during the execution of the command a separate goroutine // reads from the process over a pipe and delivers that data to the @@ -635,6 +637,8 @@ const ( // performance impact if too small by causing OutputStream.Write to block // excessively. DEFAULT_STREAM_CHAN_SIZE = 1000 + + DEFAULT_FLUSH_INTERVAL = 3 * time.Second ) // ErrLineBufferOverflow is returned by OutputStream.Write when the internal @@ -682,10 +686,11 @@ func (e ErrLineBufferOverflow) Error() string { // While runnableCmd is running, lines are sent to the channel as soon as they // are written and newline-terminated by the command. type OutputStream struct { - streamChan chan string - bufSize int - buf []byte - lastChar int + streamChan chan string + bufSize int + buf []byte + lastChar int + flushTicker *time.Ticker } // NewOutputStream creates a new streaming output on the given channel. The @@ -695,10 +700,12 @@ func NewOutputStream(streamChan chan string) *OutputStream { out := &OutputStream{ streamChan: streamChan, // -- - bufSize: DEFAULT_LINE_BUFFER_SIZE, - buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE), - lastChar: 0, + bufSize: DEFAULT_LINE_BUFFER_SIZE, + buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE), + lastChar: 0, + flushTicker: time.NewTicker(DEFAULT_FLUSH_INTERVAL), } + go out.autoFlush() // 启动自动刷新协程 return out } @@ -783,5 +790,18 @@ func (rw *OutputStream) Flush() { if rw.lastChar > 0 { line := string(rw.buf[0:rw.lastChar]) rw.streamChan <- line + rw.lastChar = 0 // 清空缓冲区 + } +} + +// autoFlush is a goroutine that automatically flushes the buffer if no new data is received within the timeout period. +func (rw *OutputStream) autoFlush() { + for range rw.flushTicker.C { + rw.Flush() } } + +// Stop stops the autoFlush goroutine and releases resources. +func (rw *OutputStream) Stop() { + rw.flushTicker.Stop() +}