Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Line breaks and buffer overflows (broken pipes) #106

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ func (c *Cmd) run(in io.Reader) {
if c.stdoutStream != nil {
defer func() {
c.stdoutStream.Flush()
c.stdoutStream.Stop()
c.stderrStream.Flush()
c.stderrStream.Stop()
// exec.Cmd.Wait has already waited for all output:
// Otherwise, during the execution of the command a separate goroutine
// reads from the process over a pipe and delivers that data to the
Expand Down Expand Up @@ -635,6 +637,8 @@ const (
// performance impact if too small by causing OutputStream.Write to block
// excessively.
DEFAULT_STREAM_CHAN_SIZE = 1000

DEFAULT_FLUSH_INTERVAL = 3 * time.Second
)

// ErrLineBufferOverflow is returned by OutputStream.Write when the internal
Expand Down Expand Up @@ -682,10 +686,11 @@ func (e ErrLineBufferOverflow) Error() string {
// While runnableCmd is running, lines are sent to the channel as soon as they
// are written and newline-terminated by the command.
type OutputStream struct {
streamChan chan string
bufSize int
buf []byte
lastChar int
streamChan chan string
bufSize int
buf []byte
lastChar int
flushTicker *time.Ticker
}

// NewOutputStream creates a new streaming output on the given channel. The
Expand All @@ -695,10 +700,12 @@ func NewOutputStream(streamChan chan string) *OutputStream {
out := &OutputStream{
streamChan: streamChan,
// --
bufSize: DEFAULT_LINE_BUFFER_SIZE,
buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE),
lastChar: 0,
bufSize: DEFAULT_LINE_BUFFER_SIZE,
buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE),
lastChar: 0,
flushTicker: time.NewTicker(DEFAULT_FLUSH_INTERVAL),
}
go out.autoFlush() // 启动自动刷新协程
return out
}

Expand Down Expand Up @@ -783,5 +790,18 @@ func (rw *OutputStream) Flush() {
if rw.lastChar > 0 {
line := string(rw.buf[0:rw.lastChar])
rw.streamChan <- line
rw.lastChar = 0 // 清空缓冲区
}
}

// autoFlush is a goroutine that automatically flushes the buffer if no new data is received within the timeout period.
func (rw *OutputStream) autoFlush() {
for range rw.flushTicker.C {
rw.Flush()
}
}

// Stop stops the autoFlush goroutine and releases resources.
func (rw *OutputStream) Stop() {
rw.flushTicker.Stop()
}