Skip to content

Commit

Permalink
implement retry interval and some minor feature, update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
shenwei356 committed Jan 7, 2017
1 parent b7cce13 commit 7f6b1c3
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 105 deletions.
98 changes: 61 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ Major:
- [ ] support continue,
save status after [capturing ctrl+c](https://nathanleclaire.com/blog/2014/08/24/handling-ctrl-c-interrupt-signal-in-golang-programs/)
- [x] Replacement strings (like GNU parallel):
- [x] `{#}`, job number
- [x] `{}`, full line
- [x] `{#}`, job ID
- [x] `{}`, full data
- [x] support positional replacement strings: `{n}`
- [x] `n`th field in delimiter-delimited data
- [ ] `n`th matches of regular expression
- [ ] `n`th matche of regular expression
- [x] direcotry and file
- [x] `{/}`, dirname (`{//}` in GNU parallel)
- [x] `{%}`, basename (`{/}` in GNU parallel)
Expand Down Expand Up @@ -82,7 +82,7 @@ See on [release page](https://github.com/shenwei356/rush/releases).

1. Read data from file (`-i`)

$ rush echo {} -i data.txt
$ rush echo {} -i data1.txt -i data2.txt

1. Keep output order (`-k`)

Expand All @@ -108,7 +108,7 @@ See on [release page](https://github.com/shenwei356/rush/releases).
python: can't open file 'unexisted_script.py': [Errno 2] No such file or directory
[ERRO] wait command: python unexisted_script.py: exit status 2

1. Directory (`{/}`) and basename (`{%}`)
1. Dirname (`{/}`) and basename (`{%}`)

$ echo dir/file.txt.gz | rush 'echo {/} {%}'
dir file.txt.gz
Expand All @@ -120,8 +120,8 @@ See on [release page](https://github.com/shenwei356/rush/releases).

1. Job ID, combine fields index and other replacement strings

$ echo 123 file.txt | rush 'echo job {#}: {2} {2.} {1}'
job 1: file.txt file 123
$ echo 123 file.txt | rush 'echo job {#}: {2} {2.}'
job 1: file.txt file

1. Custom field delimiter (`-d`)

Expand All @@ -146,17 +146,31 @@ See on [release page](https://github.com/shenwei356/rush/releases).
CCCC

$ echo -ne ">seq1\nactg\n>seq2\nAAAA\n>seq3\nCCCC" | rush -D ">" 'echo FASTA record {#}: name: {1} sequence: {2}' -k -d "\n"
FASTA record 1: name: sequence:
FASTA record 2: name: seq1 sequence: actg
FASTA record 3: name: seq2 sequence: AAAA
FASTA record 4: name: seq3 sequence: CCCC

1. Assign value to variable, like `awk -v` (`-v`)

$ seq 1 | rush 'echo Hello, {fname} {lname}!' -v fname=Wei -v lname=Shen
Hello, Wei Shen!

$ for var in a b; do \
$ seq 1 3 | rush -k -v var=$var 'echo var: {var}, data: {}'; \
$ done
var: a, data: 1
var: a, data: 2
var: a, data: 3
var: b, data: 1
var: b, data: 2
var: b, data: 3

## Usage

```
rush -- parallelly execute shell commands
Version: 0.0.3
Version: 0.0.4
Author: Wei Shen <[email protected]>
Expand All @@ -166,36 +180,46 @@ Usage:
rush [flags] [command] [args of command...]
Examples:
1. simple run : seq 1 10 | rush echo {} # quoting is not necessary
2. keep order : seq 1 10 | rush 'echo {}' -k
3. with timeout: seq 1 | rush 'sleep 2; echo {}' -t 1
4. retry : seq 1 | rush 'python script.py' -r 3
5. basename : echo dir/file.txt.gz | rush 'echo {%}'  # file.txt.gz
6. dirname : echo dir/file.txt.gz | rush 'echo {/}' # dir
7. basename without last extension
: echo dir/file.txt.gz | rush 'echo {%.}' # file.txt
8. basename without last extension
: echo dir/file.txt.gz | rush 'echo {%:}' # file
9. job ID, combine fields and other replacement string
: echo 123 file.txt | rush 'echo job {#}: {2} {2.} {1}'
# job 1: file.txt file 123
1. simple run, quoting is not necessary
$ seq 1 10 | rush echo {}
2. keep order
$ seq 1 10 | rush 'echo {}' -k
3. timeout
$ seq 1 | rush 'sleep 2; echo {}' -t 1
4. retry
$ seq 1 | rush 'python script.py' -r 3
5. dirname & basename
$ echo dir/file.txt.gz | rush 'echo {/} {%}'
dir file.txt.gz
6. basename without last or any extension
$ echo dir/file.txt.gz | rush 'echo {%.} {%:}'
file.txt file
7. job ID, combine fields and other replacement strings
$ echo 123 file.txt | rush 'echo job {#}: {2} {2.}'
job 1: file.txt file
8. custom field delimiter
$ echo a=b=c | rush 'echo {1} {2} {3}' -d =
a b c
9. assign value to variable, like "awk -v"
$ seq 1 | rush 'echo Hello, {fname} {lname}!' -v fname=Wei -v lname=Shen
Hello, Wei Shen!
More examples: https://github.com/shenwei356/rush
Flags:
-v, --assign stringSlice assign the value val to the variable var (format: var=val)
--dry-run print command but not run
-d, --field-delimiter string field delimiter in records (default "\s+")
-i, --infile stringSlice input data file
-j, --jobs int run n jobs in parallel (default 4)
-d, --field-delimiter string field delimiter in records, support regular expression (default "\s+")
-i, --infile stringSlice input data file, multi-values supported
-j, --jobs int run n jobs in parallel (default value depends on your device) (default 4)
-k, --keep-order keep output in order of input
-n, --nrecords int number of records sent to a command (default 1)
-o, --out-file string out file ("-" for stdout) (default "-")
-D, --record-delimiter string record delimiter (default is "\n") (default "
-D, --record-delimiter string record delimiter, supports regular expression (default is "\n") (default "
")
-r, --retries int maximum retries
--retry-interval int retry interval (unit: second)
-r, --retries int maximum retries (default 0)
--retry-interval int retry interval (unit: second) (default 0)
-e, --stop-on-error stop all processes on first error(s)
-t, --timeout int timeout of a command (unit: second, 0 for no timeout)
-t, --timeout int timeout of a command (unit: second, 0 for no timeout) (default 0)
--trim string trim white space in input (available values: "l" for left, "r" for right, "lr", "rl", "b" for both side)
--verbose print verbose information
-V, --version print version information and check for update
Expand Down Expand Up @@ -233,17 +257,17 @@ For Go developer, just one command:

## Download

Latest version: [rush v0.0.3](https://github.com/shenwei356/rush/releases/tag/v0.0.3)
[![Github Releases (by Release)](https://img.shields.io/github/downloads/shenwei356/rush/v0.0.3/total.svg)](https://github.com/shenwei356/rush/releases/tag/v0.0.3)
[rush v0.0.4](https://github.com/shenwei356/rush/releases/tag/v0.0.4)
[![Github Releases (by Release)](https://img.shields.io/github/downloads/shenwei356/rush/v0.0.4/total.svg)](https://github.com/shenwei356/rush/releases/tag/v0.0.4)

OS |Arch |File |Download Count
:------|:---------|:-----------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Linux |32-bit |[rush_linux_386.tar.gz](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_linux_386.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_linux_386.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_linux_386.tar.gz)
Linux |**64-bit**|[**rush_linux_amd64.tar.gz**](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_linux_amd64.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_linux_amd64.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_linux_amd64.tar.gz)
OS X |32-bit |[rush_darwin_386.tar.gz](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_darwin_386.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_darwin_386.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_darwin_386.tar.gz)
OS X |**64-bit**|[**rush_darwin_amd64.tar.gz**](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_darwin_amd64.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_darwin_amd64.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_darwin_amd64.tar.gz)
Windows|32-bit |[rush_windows_386.exe.tar.gz](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_windows_386.exe.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_windows_386.exe.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_windows_386.exe.tar.gz)
Windows|**64-bit**|[**rush_windows_amd64.exe.tar.gz**](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_windows_amd64.exe.tar.gz)|[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_windows_amd64.exe.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.3/rush_windows_amd64.exe.tar.gz)
Linux |32-bit |[rush_linux_386.tar.gz](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_linux_386.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_linux_386.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_linux_386.tar.gz)
Linux |**64-bit**|[**rush_linux_amd64.tar.gz**](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_linux_amd64.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_linux_amd64.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_linux_amd64.tar.gz)
OS X |32-bit |[rush_darwin_386.tar.gz](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_darwin_386.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_darwin_386.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_darwin_386.tar.gz)
OS X |**64-bit**|[**rush_darwin_amd64.tar.gz**](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_darwin_amd64.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_darwin_amd64.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_darwin_amd64.tar.gz)
Windows|32-bit |[rush_windows_386.exe.tar.gz](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_windows_386.exe.tar.gz) |[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_windows_386.exe.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_windows_386.exe.tar.gz)
Windows|**64-bit**|[**rush_windows_amd64.exe.tar.gz**](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_windows_amd64.exe.tar.gz)|[![Github Releases (by Asset)](https://img.shields.io/github/downloads/shenwei356/rush/latest/rush_windows_amd64.exe.tar.gz.svg?maxAge=3600)](https://github.com/shenwei356/rush/releases/download/v0.0.4/rush_windows_amd64.exe.tar.gz)

## Acknowledgements

Expand Down
2 changes: 1 addition & 1 deletion rush/cmd/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// VERSION of this package
const VERSION = "0.0.3"
const VERSION = "0.0.4"

func isStdin(file string) bool {
return file == "-"
Expand Down
93 changes: 57 additions & 36 deletions rush/cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,22 @@ import (

// Command is the Command struct
type Command struct {
ID uint64
Cmd string

Cancel chan struct{}
Timeout time.Duration
ctx context.Context
ctxCancel context.CancelFunc

Ch chan string // buffer for output
finishSendOutput bool

Err error
Duration time.Duration

reader *bufio.Reader
tmpfile string
tmpfh *os.File
ID uint64 // ID
Cmd string // command

Cancel chan struct{} // channel for close
Timeout time.Duration // time out
ctx context.Context // context.WithTimeout
ctxCancel context.CancelFunc // cancel func for timetout

Ch chan string // channel for stdout
reader *bufio.Reader // reader for stdout
tmpfile string // tmpfile for stdout
tmpfh *os.File // file handler for tmpfile
finishSendOutput bool // a flag of whether finished sending output to Ch

Err error // Error
Duration time.Duration // runtime
}

// NewCommand create a Command
Expand Down Expand Up @@ -174,9 +173,12 @@ func (c *Command) ExitCode() int {
return 1
}

// ErrTimeout means timeout
// ErrTimeout means command timeout
var ErrTimeout = fmt.Errorf("time out")

// ErrCancelled means command being cancelled
var ErrCancelled = fmt.Errorf("cancelled")

func splitCmdAndArgs(command string) (string, string) {
var c, args string
command = strings.Trim(command, " ")
Expand All @@ -189,14 +191,17 @@ func splitCmdAndArgs(command string) (string, string) {
return command, ""
}

// run a command and save output to c.reader.
// run a command and pass output to c.reader.
// Note that output returns only after finishing run.
// This function is mainly borrowed from https://github.com/brentp/gargs .
func (c *Command) run() error {
t := time.Now()
chCancelMonitor := make(chan struct{})
defer func() {
close(chCancelMonitor)
c.Duration = time.Now().Sub(t)
}()

var command *exec.Cmd
qcmd := fmt.Sprintf(`%s`, c.Cmd)
if Verbose {
Expand All @@ -222,22 +227,36 @@ func (c *Command) run() error {

pipeStdout, err := command.StdoutPipe()
if err != nil {
return errors.Wrapf(err, "get stdout pipe of command: %s", c.Cmd)
return errors.Wrapf(err, "get stdout pipe of cmd #%d: %s", c.ID, c.Cmd)
}
defer pipeStdout.Close()

command.Stderr = os.Stderr

err = command.Start()
if err != nil {
checkError(errors.Wrapf(err, "start command: %s", c.Cmd))
checkError(errors.Wrapf(err, "start cmd #%d: %s", c.ID, c.Cmd))
}

bpipe := bufio.NewReaderSize(pipeStdout, TmpOutputDataBuffer)

chErr := make(chan error, 1) // may from two sources, must be buffered
chErr := make(chan error, 1) // may come from two sources, must be buffered
chEndBeforeTimeout := make(chan struct{})

go func() {
for {
select {
case <-c.Cancel:
chErr <- ErrCancelled
log.Infof("cancel cmd #%d: %s", c.ID, c.Cmd)
command.Process.Kill()
return
case <-chCancelMonitor:
return
}
}
}()

// detect timeout
if c.Timeout > 0 {
go func() { // goroutine #T
Expand Down Expand Up @@ -288,15 +307,15 @@ func (c *Command) run() error {
}

if err != nil {
return errors.Wrapf(err, "wait command: %s", c.Cmd)
return errors.Wrapf(err, "wait cmd #%d: %s", c.ID, c.Cmd)
}
c.reader = bufio.NewReader(bytes.NewReader(readed))
return nil
}

// more than TmpOutputDataBuffer bytes in output. must use tmpfile
if err != nil {
return errors.Wrapf(err, "run command #%d: %s", c.ID, c.Cmd)
return errors.Wrapf(err, "run cmd #%d: %s", c.ID, c.Cmd)
}

// if Verbose {
Expand All @@ -305,7 +324,7 @@ func (c *Command) run() error {

c.tmpfh, err = ioutil.TempFile("", tmpfilePrefix)
if err != nil {
return errors.Wrapf(err, "create tmpfile for command: %s", c.Cmd)
return errors.Wrapf(err, "create tmpfile for cmd #%d: %s", c.ID, c.Cmd)
}

c.tmpfile = c.tmpfh.Name()
Expand Down Expand Up @@ -335,21 +354,22 @@ func (c *Command) run() error {
}
}
if err != nil {
return errors.Wrapf(err, "wait command: %s", c.Cmd)
return errors.Wrapf(err, "wait cmd #%d: %s", c.ID, c.Cmd)
}

return nil
}

// Options contains the options
type Options struct {
DryRun bool // just print command
Jobs int // max jobs number
KeepOrder bool // keep output order
Retries int // max retry chances
Timeout time.Duration // timeout
StopOnErr bool // stop on any error
Verbose bool
DryRun bool // just print command
Jobs int // max jobs number
KeepOrder bool // keep output order
Retries int // max retry chances
RetryInterval time.Duration // retry interval
Timeout time.Duration // timeout
StopOnErr bool // stop on any error
Verbose bool
}

// Run4Output runs commands in parallel from channel chCmdStr,
Expand Down Expand Up @@ -401,7 +421,7 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
for line = range c.Ch {
chOut <- line
}
checkError(errors.Wrapf(c.Cleanup(), "remove tmpfile for cmd: %s", c.Cmd))
checkError(errors.Wrapf(c.Cleanup(), "remove tmpfile for cmd #%d: %s", c.ID, c.Cmd))
break LOOP
}
}
Expand All @@ -412,7 +432,7 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
}(c)
}

} else { // keep drder
} else { // keep order
wg.Add(1)

var id uint64 = 1
Expand Down Expand Up @@ -534,7 +554,7 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma
done <- 1
log.Error("stop on first error(s)")
}
os.Exit(1)
os.Exit(1) // too violent
}
if chances > 0 {
if Verbose && opts.Retries > 0 {
Expand All @@ -543,6 +563,7 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma
opts.Retries, command.Cmd)
}
chances--
<-time.After(opts.RetryInterval)
continue
}
return
Expand Down
Loading

0 comments on commit 7f6b1c3

Please sign in to comment.