diff --git a/process/process.go b/process/process.go index 720d060..c9dc9cc 100644 --- a/process/process.go +++ b/process/process.go @@ -31,6 +31,7 @@ import ( "os/exec" "runtime" "sort" + "strconv" "strings" "sync" "syscall" @@ -39,6 +40,7 @@ import ( "github.com/cznic/sortutil" "github.com/pkg/errors" "github.com/shenwei356/go-logging" + psutil "github.com/shirou/gopsutil/process" ) // Log is *logging.Logger @@ -73,7 +75,8 @@ type Command struct { Err error // Error Duration time.Duration // runtime - dryrun bool + dryrun bool + exitStatus int } // NewCommand create a Command @@ -104,20 +107,22 @@ var TmpOutputDataBuffer = 1048576 // 1M var OutputChunkSize = 16384 // 16K // Run runs a command and send output to command.Ch in background. -func (c *Command) Run() error { - c.Ch = make(chan string, 1) +func (c *Command) Run(opts *Options) (chan string, error) { + // create a return chan here; we will set the c.Ch in the parent + ch := make(chan string, 1) if c.dryrun { - c.Ch <- c.Cmd + "\n" - close(c.Ch) + ch <- c.Cmd + "\n" + close(ch) c.finishSendOutput = true - return nil + return ch, nil } - c.Err = c.run() - if c.Err != nil { - return c.Err - } + c.Err = c.run(opts) + + // don't return here, keep going so we can display + // the output from commands that error + var readErr error = nil if Verbose { Log.Infof("finish cmd #%d in %s: %s", c.ID, c.Duration, c.Cmd) @@ -136,21 +141,26 @@ func (c *Command) Run() error { var existedN int // var N uint64 for { - n, c.Err = c.reader.Read(buf) + if c.reader != nil { + n, readErr = c.reader.Read(buf) + } else { + n = 0 + readErr = io.EOF + } existedN = b.Len() b.Write(buf[0:n]) - if c.Err != nil { - if c.Err == io.EOF { + if readErr != nil { + if readErr == io.EOF { if b.Len() > 0 { // if Verbose { // N += uint64(b.Len()) // } - c.Ch <- b.String() // string(buf[0:n]) + ch <- b.String() // string(buf[0:n]) } b.Reset() - c.Err = nil + readErr = nil } break } @@ -164,7 +174,7 @@ func (c *Command) Run() error { // if Verbose { // N += uint64(len(bb[0 : i+1])) // } - c.Ch <- string(bb[0 : i+1]) // string(buf[0:n]) + ch <- string(bb[0 : i+1]) // string(buf[0:n]) b.Reset() if i-existedN+1 < n { @@ -184,10 +194,18 @@ func (c *Command) Run() error { // Log.Infof("finish reading data from: %s", c.Cmd) // } - close(c.Ch) + close(ch) c.finishSendOutput = true }() - return nil + if c.Err != nil { + return ch, c.Err + } else { + if readErr != nil { + return ch, readErr + } else { + return ch, nil + } + } } var isWindows bool = runtime.GOOS == "windows" @@ -230,29 +248,66 @@ func (c *Command) Cleanup() error { return err } -// ExitCode returns the exit code associated with a given error -func (c *Command) ExitCode() int { - if c.Err == nil { - return 0 - } - if ex, ok := c.Err.(*exec.ExitError); ok { - if st, ok := ex.Sys().(syscall.WaitStatus); ok { - return st.ExitStatus() - } - } - return 1 -} - // ErrTimeout means command timeout var ErrTimeout = fmt.Errorf("time out") // ErrCancelled means command being cancelled var ErrCancelled = fmt.Errorf("cancelled") +func (c *Command) getExitStatus(err error) int { + if exitError, ok := err.(*exec.ExitError); ok { + waitStatus := exitError.Sys().(syscall.WaitStatus) + return waitStatus.ExitStatus() + } + // no error, so return exitStatus 0 + return 0 +} + +func isProcessRunning(pid int) bool { + _, err := os.FindProcess(pid) + if err != nil { + return false + } + return true +} + +// ensure Windows processes go away +func killWindowsProcessTreeRecursive(childProcess *psutil.Process) { + grandChildren, err := childProcess.Children() + if grandChildren != nil && err == nil { + for _, value := range grandChildren { + killWindowsProcessTreeRecursive(value) + } + } + attempts := 1 + for { + if Verbose { + Log.Infof("taskkill /t /f /pid %s", strconv.Itoa(int(childProcess.Pid))) + } + out, err := exec.Command("taskkill", "/t", "/f", "/pid", strconv.Itoa(int(childProcess.Pid))).Output() + if Verbose { + if err != nil { + Log.Error(err) + } + Log.Infof("%s", out) + } + + if !isProcessRunning(int(childProcess.Pid)) { + break + } else { + time.Sleep(10 * time.Millisecond) + attempts += 1 + if attempts > 30 { + break + } + } + } +} + // run a command and pass output to c.reader. // Note that output returns only after finishing run. // This function is mainly borrowed from https://github.com/brentp/gargs . -func (c *Command) run() error { +func (c *Command) run(opts *Options) error { t := time.Now() chCancelMonitor := make(chan struct{}) defer func() { @@ -269,13 +324,15 @@ func (c *Command) run() error { if c.Timeout > 0 { c.ctx, c.ctxCancel = context.WithTimeout(context.Background(), c.Timeout) if isWindows { - command = exec.CommandContext(c.ctx, getShell(), "/c", qcmd) + command = exec.CommandContext(c.ctx, getShell()) + c.setWindowsCommandAttr(command, qcmd) } else { command = exec.CommandContext(c.ctx, getShell(), "-c", qcmd) } } else { if isWindows { - command = exec.Command(getShell(), "/c", qcmd) + command = exec.Command(getShell()) + c.setWindowsCommandAttr(command, qcmd) } else { command = exec.Command(getShell(), "-c", qcmd) } @@ -306,7 +363,17 @@ func (c *Command) run() error { Log.Warningf("cancel cmd #%d: %s", c.ID, c.Cmd) } chErr <- ErrCancelled - command.Process.Kill() + if opts.KillOnCtrlC { + if isWindows { + childProcess, err := psutil.NewProcess(int32(command.Process.Pid)) + if err != nil { + Log.Error(err) + } + killWindowsProcessTreeRecursive(childProcess) + } else { + command.Process.Kill() + } + } case <-chCancelMonitor: // default: // must not use default, if you must use, use for loop } @@ -359,10 +426,14 @@ func (c *Command) run() error { err = command.Wait() } + if opts.PropExitStatus { + c.exitStatus = c.getExitStatus(err) + } + // get reader even on error, so we can still print the stdout and stderr of the failed child process + c.reader = bufio.NewReader(bytes.NewReader(readed)) if err != nil { return errors.Wrapf(err, "wait cmd #%d: %s", c.ID, c.Cmd) } - c.reader = bufio.NewReader(bytes.NewReader(readed)) return nil } @@ -406,6 +477,9 @@ func (c *Command) run() error { err = command.Wait() } } + if opts.PropExitStatus { + c.exitStatus = c.getExitStatus(err) + } if err != nil { return errors.Wrapf(err, "wait cmd #%d: %s", c.ID, c.Cmd) } @@ -420,8 +494,11 @@ type Options struct { KeepOrder bool // keep output order Retries int // max retry chances RetryInterval time.Duration // retry interval + PrintRetryOutput bool // print output from retries Timeout time.Duration // timeout StopOnErr bool // stop on any error + PropExitStatus bool // propagate child exit status + KillOnCtrlC bool // kill child processes on ctrl-c RecordSuccessfulCmd bool // send successful command to channel Verbose bool } @@ -429,11 +506,11 @@ type Options struct { // Run4Output runs commands in parallel from channel chCmdStr, // and returns an output text channel, // and a done channel to ensure safe exit. -func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan string, chan string, chan int) { +func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan string, chan string, chan int, chan int) { if opts.Verbose { Verbose = true } - chCmd, chSuccessfulCmd, doneChCmd := Run(opts, cancel, chCmdStr) + chCmd, chSuccessfulCmd, doneChCmd, chExitStatus := Run(opts, cancel, chCmdStr) chOut := make(chan string, opts.Jobs) done := make(chan int) @@ -553,13 +630,34 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan // } done <- 1 }() - return chOut, chSuccessfulCmd, done + return chOut, chSuccessfulCmd, done, chExitStatus +} + +// write strings and report done +func combineWorker(input <-chan string, output chan<- string, wg *sync.WaitGroup) { + defer wg.Done() + for val := range input { + output <- val + } +} + +// combine strings in input order +func combine(inputs []<-chan string, output chan<- string) { + group := new(sync.WaitGroup) + go func() { + for _, input := range inputs { + group.Add(1) + go combineWorker(input, output, group) + group.Wait() // preserve input order + } + close(output) + }() } // Run runs commands in parallel from channel chCmdStr, // and returns a Command channel, // and a done channel to ensure safe exit. -func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Command, chan string, chan int) { +func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Command, chan string, chan int, chan int) { if opts.Verbose { Verbose = true } @@ -570,6 +668,10 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma chSuccessfulCmd = make(chan string, opts.Jobs) } done := make(chan int) + var chExitStatus chan int + if opts.PropExitStatus { + chExitStatus = make(chan int) + } go func() { var wg sync.WaitGroup @@ -607,11 +709,20 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma } chances := opts.Retries + var outputsToPrint []<-chan string for { - err := command.Run() + ch, err := command.Run(opts) if err != nil { // fail to run if chances == 0 || opts.StopOnErr { + // print final output + outputsToPrint = append(outputsToPrint, ch) Log.Error(err) + if opts.PropExitStatus { + chExitStatus <- command.exitStatus + } + command.Ch = make(chan string, 1) + combine(outputsToPrint, command.Ch) + chCmd <- command } else { Log.Warning(err) } @@ -626,6 +737,9 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma if opts.RecordSuccessfulCmd { close(chSuccessfulCmd) } + if opts.PropExitStatus { + close(chExitStatus) + } done <- 1 } @@ -633,6 +747,9 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma return } if chances > 0 { + if opts.PrintRetryOutput { + outputsToPrint = append(outputsToPrint, ch) + } if Verbose && opts.Retries > 0 { Log.Warningf("retry %d/%d times: %s", opts.Retries-chances+1, @@ -644,9 +761,16 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma } return } + // print final output + outputsToPrint = append(outputsToPrint, ch) + if opts.PropExitStatus { + chExitStatus <- command.exitStatus + } break } + command.Ch = make(chan string, 1) + combine(outputsToPrint, command.Ch) chCmd <- command if opts.RecordSuccessfulCmd { chSuccessfulCmd <- cmdStr @@ -663,7 +787,10 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma } } + if opts.PropExitStatus { + close(chExitStatus) + } done <- 1 }() - return chCmd, chSuccessfulCmd, done + return chCmd, chSuccessfulCmd, done, chExitStatus } diff --git a/process/process_others.go b/process/process_others.go new file mode 100644 index 0000000..49660fa --- /dev/null +++ b/process/process_others.go @@ -0,0 +1,35 @@ +// +build !windows + +// Copyright © 2017 Wei Shen +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package process + +import ( + "os/exec" +) + +func (c *Command) setWindowsCommandAttr(command *exec.Cmd, qcmd string) { + if isWindows { + panic("should have called process_windows.go setWindowsCommandAttr()!") + } else { + // noop for all platforms except windows + } +} diff --git a/process/process_windows.go b/process/process_windows.go new file mode 100644 index 0000000..2075020 --- /dev/null +++ b/process/process_windows.go @@ -0,0 +1,42 @@ +// +build windows + +// Copyright © 2017 Wei Shen +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package process + +import ( + "fmt" + "os/exec" + "syscall" +) + +// from https://github.com/junegunn/fzf/blob/390b49653b441c958b82a0f78d9923aef4c1d9a2/src/util/util_windows.go +func (c *Command) setWindowsCommandAttr(command *exec.Cmd, qcmd string) { + if isWindows { + command.SysProcAttr = &syscall.SysProcAttr{ + HideWindow: false, + CmdLine: fmt.Sprintf(` /s /c "%s"`, qcmd), + CreationFlags: 0, + } + } else { + panic("should have called process_others.go setWindowsCommandAttr()!") + } +} diff --git a/root.go b/root.go index 5db82c1..927d640 100644 --- a/root.go +++ b/root.go @@ -133,8 +133,11 @@ Homepage: https://github.com/shenwei356/rush KeepOrder: config.KeepOrder, Retries: config.Retries, RetryInterval: time.Duration(config.RetryInterval) * time.Second, + PrintRetryOutput: config.PrintRetryOutput, Timeout: time.Duration(config.Timeout) * time.Second, StopOnErr: config.StopOnErr, + PropExitStatus: config.PropExitStatus, + KillOnCtrlC: config.KillOnCtrlC, Verbose: config.Verbose, RecordSuccessfulCmd: config.Continue, } @@ -167,6 +170,8 @@ Homepage: https://github.com/shenwei356/rush // channel of command chCmdStr := make(chan string, config.Jobs) + anyCommands := false + // read data and generate command go func() { n := config.NRecords @@ -227,9 +232,11 @@ Homepage: https://github.com/shenwei356/rush bfhSuccCmds.Flush() } else { chCmdStr <- cmdStr + anyCommands = true } } else { chCmdStr <- cmdStr + anyCommands = true } id++ @@ -254,9 +261,11 @@ Homepage: https://github.com/shenwei356/rush bfhSuccCmds.Flush() } else { chCmdStr <- cmdStr + anyCommands = true } } else { chCmdStr <- cmdStr + anyCommands = true } } } @@ -273,7 +282,7 @@ Homepage: https://github.com/shenwei356/rush // --------------------------------------------------------------- // run - chOutput, chSuccessfulCmd, doneSendOutput := process.Run4Output(opts, cancel, chCmdStr) + chOutput, chSuccessfulCmd, doneSendOutput, chExitStatus := process.Run4Output(opts, cancel, chCmdStr) // read from chOutput and print doneOutput := make(chan int) @@ -302,6 +311,35 @@ Homepage: https://github.com/shenwei356/rush }() } + var pToolExitStatus *int = nil + var doneExitStatus chan int + if config.PropExitStatus { + doneExitStatus = make(chan int) + toolExitStatus := 0 + go func() { + for childCode := range chExitStatus { + setPointer := false + setCode := false + if pToolExitStatus == nil { + setPointer = true + setCode = true + } else { + // use the code from the first error we received + if *pToolExitStatus == 0 && childCode != 0 { + setCode = true + } + } + if setPointer { + pToolExitStatus = &toolExitStatus + } + if setCode { + *pToolExitStatus = childCode + } + } + doneExitStatus <- 1 + }() + } + // --------------------------------------------------------------- chExitSignalMonitor := make(chan struct{}) @@ -329,12 +367,25 @@ Homepage: https://github.com/shenwei356/rush <-donePreprocessFiles // finish read data and send command <-doneSendOutput // finish send output <-doneOutput // finish print output + if config.PropExitStatus { + <-doneExitStatus + } if config.Continue { <-doneSaveSuccCmd } close(chExitSignalMonitor) <-cleanupDone + + if config.PropExitStatus { + if anyCommands { + if pToolExitStatus != nil { + os.Exit(*pToolExitStatus) + } else { + checkError(fmt.Errorf(`did not get an exit status int from any child process)`)) + } + } + } }, } @@ -369,10 +420,13 @@ func init() { RootCmd.Flags().IntP("retries", "r", 0, "maximum retries (default 0)") RootCmd.Flags().IntP("retry-interval", "", 0, "retry interval (unit: second) (default 0)") + RootCmd.Flags().BoolP("print-retry-output", "", true, "print output from retry commands") RootCmd.Flags().IntP("timeout", "t", 0, "timeout of a command (unit: second, 0 for no timeout) (default 0)") RootCmd.Flags().BoolP("keep-order", "k", false, "keep output in order of input") RootCmd.Flags().BoolP("stop-on-error", "e", false, "stop all processes on first error(s)") + RootCmd.Flags().BoolP("propagate-exit-status", "", true, "propagate child exit status up to the exit status of rush") + RootCmd.Flags().BoolP("kill-on-ctrl-c", "", true, "kill child processes on ctrl-c") RootCmd.Flags().BoolP("dry-run", "", false, "print command but not run") RootCmd.Flags().BoolP("continue", "c", false, `continue jobs.`+ @@ -484,13 +538,16 @@ type Config struct { FieldDelimiter string reFieldDelimiter *regexp.Regexp - Retries int - RetryInterval int - Timeout int + Retries int + RetryInterval int + PrintRetryOutput bool + Timeout int - KeepOrder bool - StopOnErr bool - DryRun bool + KeepOrder bool + StopOnErr bool + PropExitStatus bool + KillOnCtrlC bool + DryRun bool Continue bool SuccCmdFile string @@ -549,13 +606,16 @@ func getConfigs(cmd *cobra.Command) Config { NRecords: getFlagPositiveInt(cmd, "nrecords"), FieldDelimiter: getFlagString(cmd, "field-delimiter"), - Retries: getFlagNonNegativeInt(cmd, "retries"), - RetryInterval: getFlagNonNegativeInt(cmd, "retry-interval"), - Timeout: getFlagNonNegativeInt(cmd, "timeout"), + Retries: getFlagNonNegativeInt(cmd, "retries"), + RetryInterval: getFlagNonNegativeInt(cmd, "retry-interval"), + PrintRetryOutput: getFlagBool(cmd, "print-retry-output"), + Timeout: getFlagNonNegativeInt(cmd, "timeout"), - KeepOrder: getFlagBool(cmd, "keep-order"), - StopOnErr: getFlagBool(cmd, "stop-on-error"), - DryRun: getFlagBool(cmd, "dry-run"), + KeepOrder: getFlagBool(cmd, "keep-order"), + StopOnErr: getFlagBool(cmd, "stop-on-error"), + PropExitStatus: getFlagBool(cmd, "propagate-exit-status"), + KillOnCtrlC: getFlagBool(cmd, "kill-on-ctrl-c"), + DryRun: getFlagBool(cmd, "dry-run"), Continue: getFlagBool(cmd, "continue"), SuccCmdFile: getFlagString(cmd, "succ-cmd-file"),