Skip to content

Commit

Permalink
some changes, update doc, release v0.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
shenwei356 committed Jan 5, 2017
1 parent b1398f6 commit 570cd8d
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 60 deletions.
68 changes: 57 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# rush

[![Go Report Card](https://goreportcard.com/badge/github.com/shenwei356/rush)](https://goreportcard.com/report/github.com/shenwei356/rush)
[![Latest Version](https://img.shields.io/github/release/shenwei356/rush.svg?style=flat?maxAge=86400)](https://github.com/shenwei356/rush/releases)
[![Github Releases](https://img.shields.io/github/downloads/shenwei356/rush/latest/total.svg?maxAge=3600)](http://bioinf.shenwei.me/rush/download/)

`rush` -- parallelly execute shell commands.

`rush` is a tool similar to [GNU parallel](https://www.gnu.org/software/parallel/)
and [gargs](https://github.com/brentp/gargs).
`rush` borrows some idea from them and has some unique features,
e.g., more advanced embeded strings replacement than `parallel`.

**Source code:** [https://github.com/shenwei356/rush](https://github.com/shenwei356/rush)
[![GitHub stars](https://img.shields.io/github/stars/shenwei356/rush.svg?style=social&label=Star&?maxAge=2592000)](https://github.com/shenwei356/rush)
[![license](https://img.shields.io/github/license/shenwei356/rush.svg?maxAge=2592000)](https://github.com/shenwei356/rush/blob/master/LICENSE)
[![Go Report Card](https://goreportcard.com/badge/github.com/shenwei356/rush)](https://goreportcard.com/report/github.com/shenwei356/rush)

**Latest version:** [![Latest Version](https://img.shields.io/github/release/shenwei356/rush.svg?style=flat?maxAge=86400)](https://github.com/shenwei356/rush/releases)
[![Github Releases](https://img.shields.io/github/downloads/shenwei356/rush/latest/total.svg?maxAge=3600)](http://bioinf.shenwei.me/rush/download/)

## Features

Major:
Expand All @@ -29,7 +25,7 @@ Major:
save status after [capturing ctrl+c](https://nathanleclaire.com/blog/2014/08/24/handling-ctrl-c-interrupt-signal-in-golang-programs/)
- [x] support positional replacement strings: `{n}`
- [x] columns in delimiter-delimited data
- [x] matches of regular expression
- [ ] matches of regular expression
- [x] GNU parallel like replacement strings:
- [x] `{#}`, job number
- [x] `{}`, full line
Expand All @@ -38,7 +34,7 @@ Major:
- [x] `{/}`, dirname (`{//}` in GNU parallel)
- [x] `{%}`, basename (`{/}` in GNU parallel)
- [x] possible combinations:
- [x] `{%.}`, `{%,}`
- [x] `{%.}`, `{%:}`
- [x] `{n.}`, `{n/}` ...
- [x] `awk -v` like defined variables
- [ ] appropriate quoting
Expand Down Expand Up @@ -70,7 +66,57 @@ Minor:

## Performance

See on [release page](https://github.com/shenwei356/rush/releases)
See on [release page](https://github.com/shenwei356/rush/releases).

## Usage & Examples

```
rush -- parallelly execute shell commands
Version: 0.0.2
Author: Wei Shen <[email protected]>
Source code: https://github.com/shenwei356/rush
Usage:
rush [flags] [command] [args of command...]
Examples:
1. simple run : seq 1 10 | rush echo {} # quoting is not necessary
2. keep order : seq 1 10 | rush 'echo {}' -k
3. with timeout: seq 1 | rush 'sleep 2; echo {}' -t 1
4. retry : seq 1 | rush 'python script.py' -r 3
5. basename : echo dir/file.txt.gz | rush 'echo {%}'  # file.txt.gz
6. dirname : echo dir/file.txt.gz | rush 'echo {/}' # dir
7. basename without last extension
: echo dir/file.txt.gz | rush 'echo {%.}' # file.txt
8. basename without last extension
: echo dir/file.txt.gz | rush 'echo {%:}' # file
9. job ID, combine fields and other replacement string
: echo 123 file.txt | rush 'echo job {#}: {2} {2.} {1}'
# job 1: file.txt file 123
Flags:
-v, --assign stringSlice assign the value val to the variable var (format: var=val)
--dry-run print command but not run
-d, --field-delimiter string field delimiter in records (default "\s+")
-i, --infile stringSlice input data file
-j, --jobs int run n jobs in parallel (default 4)
-k, --keep-order keep output in order of input
-n, --nrecords int number of records sent to a command (default 1)
-o, --out-file string out file ("-" for stdout) (default "-")
-D, --record-delimiter string record delimiter (default is "\n") (default "
")
-r, --retries int maximum retries
--retry-interval int retry interval (unit: second)
-e, --stop-on-error stop all processes on first error
-t, --timeout int timeout of a command (unit: second, 0 for no timeout)
--trim string trim white space in input (available values: "l" for left, "r" for right, "lr", "rl", "b" for both side)
--verbose print verbose information
-V, --version print version information and check for update
```

## Acknowledgements

Expand Down
2 changes: 1 addition & 1 deletion rush/cmd/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// VERSION of this package
const VERSION = "0.0.1"
const VERSION = "0.0.2"

func isStdin(file string) bool {
return file == "-"
Expand Down
51 changes: 26 additions & 25 deletions rush/cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var TmpOutputDataBuffer = 1048576 // 1M
// OutputChunkSize is buffer size of output string chunk sent to channel, default 16K.
var OutputChunkSize = 16384 // 16K

// Run starts to run
// Run runs a command and send output to command.Ch in background.
func (c *Command) Run() error {
c.Ch = make(chan string, runtime.NumCPU())

Expand All @@ -93,7 +93,7 @@ func (c *Command) Run() error {
}

if Verbose {
log.Infof("finished command in %s: %s", c.Duration, c.Cmd)
log.Infof("finish cmd #%d in %s: %s", c.ID, c.Duration, c.Cmd)
}

go func() {
Expand All @@ -115,9 +115,9 @@ func (c *Command) Run() error {
c.Ch <- string(buf[0:n])
}

if Verbose {
log.Infof("finished reading data from: %s", c.Cmd)
}
// if Verbose {
// log.Infof("finish reading data from: %s", c.Cmd)
// }

close(c.Ch)
c.finishSendOutput = true
Expand All @@ -137,26 +137,27 @@ func getShell() string {
func (c *Command) Cleanup() error {
var err error
if c.tmpfh != nil {
if Verbose {
log.Infof("close tmpfh for: %s", c.Cmd)
}
// if Verbose {
// log.Infof("close tmpfh for: %s", c.Cmd)
// }
err = c.tmpfh.Close()
if err != nil {
return err
}
}

if c.tmpfile != "" {
if Verbose {
log.Infof("remove tmpfile of command: %s", c.Cmd)
}
// if Verbose {
// log.Infof("remove tmpfile of command: %s", c.Cmd)
// }
err = os.Remove(c.tmpfile)
}
return err
}

// run a command and save output to c.reader.
// Note that output returns only after finishing run.
// This function is mainly borrowed from https://github.com/brentp/gargs .
func (c *Command) run() error {
t := time.Now()
defer func() {
Expand All @@ -165,7 +166,7 @@ func (c *Command) run() error {
var command *exec.Cmd
qcmd := fmt.Sprintf(`%s`, c.Cmd)
if Verbose {
log.Infof("run command: %s", qcmd)
log.Infof("start cmd #%d: %s", c.ID, qcmd)
}

if c.Timeout > 0 {
Expand Down Expand Up @@ -209,9 +210,9 @@ func (c *Command) run() error {
return errors.Wrapf(err, "run command: %s", c.Cmd)
}

if Verbose {
log.Infof("create tmpfile for command: %s", c.Cmd)
}
// if Verbose {
// log.Infof("create tmpfile for command: %s", c.Cmd)
// }

c.tmpfh, err = ioutil.TempFile("", tmpfilePrefix)
if err != nil {
Expand Down Expand Up @@ -283,9 +284,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
// output the command name
if opts.DryRun {
chOut <- c.Cmd + "\n"
if Verbose {
log.Infof("finished sending cmd name: %s", c.Cmd)
}
// if Verbose {
// log.Infof("finish sending cmd name: %s", c.Cmd)
// }
return
}

Expand All @@ -307,9 +308,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
}
}

if Verbose {
log.Infof("finished receiving data from: %s", c.Cmd)
}
// if Verbose {
// log.Infof("finish receiving data from: %s", c.Cmd)
// }
}(c)
}

Expand Down Expand Up @@ -382,9 +383,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
wg.Wait()
close(chOut)

if Verbose {
log.Infof("finished sending all output")
}
// if Verbose {
// log.Infof("finish sending all output")
// }
done <- 1
}()
return chOut, done
Expand Down Expand Up @@ -454,7 +455,7 @@ func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Comma
wg.Wait()
close(chCmd)
if Verbose {
log.Infof("finished running all %d commands", id-1)
log.Infof("finish running all %d commands", id-1)
}
done <- 1
}()
Expand Down
73 changes: 60 additions & 13 deletions rush/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ Version: %s
Author: Wei Shen <[email protected]>
Documents : http://bioinf.shenwei.me/rush
Source code: https://github.com/shenwei356/rush
`, VERSION),
Expand Down Expand Up @@ -75,7 +74,7 @@ Source code: https://github.com/shenwei356/rush
// -----------------------------------------------------------------

cancel := make(chan struct{})
TmpOutputDataBuffer = config.BufferSize
// TmpOutputDataBuffer = config.BufferSize
opts := &Options{
DryRun: config.DryRun,
Jobs: config.Jobs,
Expand Down Expand Up @@ -168,9 +167,9 @@ Source code: https://github.com/shenwei356/rush

close(chCmdStr)

if Verbose {
log.Infof("finished reading input data")
}
// if Verbose {
// log.Infof("finish reading input data")
// }
donePreprocess <- 1
}()

Expand Down Expand Up @@ -230,7 +229,7 @@ func init() {

RootCmd.Flags().StringSliceP("infile", "i", []string{}, "input data file")

RootCmd.Flags().StringP("record-delimiter", "D", "\n", "record delimiter")
RootCmd.Flags().StringP("record-delimiter", "D", "\n", `record delimiter (default is "\n")`)
RootCmd.Flags().IntP("nrecords", "n", 1, "number of records sent to a command")
RootCmd.Flags().StringP("field-delimiter", "d", `\s+`, "field delimiter in records")

Expand All @@ -240,13 +239,53 @@ func init() {

RootCmd.Flags().BoolP("keep-order", "k", false, "keep output in order of input")
RootCmd.Flags().BoolP("stop-on-error", "e", false, "stop all processes on first error")
RootCmd.Flags().BoolP("continue", "c", false, `continue run commands except for finished commands in "finished.txt"`)
// RootCmd.Flags().BoolP("continue", "c", false, `continue run commands except for finished commands in "finished.txt"`)
RootCmd.Flags().BoolP("dry-run", "", false, "print command but not run")

RootCmd.Flags().IntP("buffer-size", "", 1, "buffer size for output of a command before saving to tmpfile (unit: Mb)")
// RootCmd.Flags().IntP("buffer-size", "", 1, "buffer size for output of a command before saving to tmpfile (unit: Mb)")

RootCmd.Flags().StringSliceP("assign", "v", []string{}, "assign the value val to the variable var (format var=val)")
RootCmd.Flags().StringSliceP("assign", "v", []string{}, "assign the value val to the variable var (format: var=val)")
RootCmd.Flags().StringP("trim", "", "", `trim white space in input (available values: "l" for left, "r" for right, "lr", "rl", "b" for both side)`)

RootCmd.Example = ` 1. simple run : seq 1 10 | rush echo {} # quoting is not necessary
2. keep order : seq 1 10 | rush 'echo {}' -k
3. with timeout: seq 1 | rush 'sleep 2; echo {}' -t 1
4. retry : seq 1 | rush 'python script.py' -r 3
5. basename : echo dir/file.txt.gz | rush 'echo {%}'  # file.txt.gz
6. dirname : echo dir/file.txt.gz | rush 'echo {/}' # dir
7. basename without last extension
: echo dir/file.txt.gz | rush 'echo {%.}' # file.txt
8. basename without last extension
: echo dir/file.txt.gz | rush 'echo {%:}' # file
9. job ID, combine fields and other replacement string
: echo 123 file.txt | rush 'echo job {#}: {2} {2.} {1}'
# job 1: file.txt file 123`

RootCmd.SetUsageTemplate(`Usage:{{if .Runnable}}
{{if .HasAvailableFlags}}{{appendIfNotPresent .UseLine "[flags]"}}{{else}}{{.UseLine}}{{end}}{{end}}{{if .HasAvailableSubCommands}}
{{ .CommandPath}} [command]{{end}} [command] [args of command...]{{if gt .Aliases 0}}
Aliases:
{{.NameAndAliases}}
{{end}}{{if .HasExample}}
Examples:
{{ .Example }}{{end}}{{ if .HasAvailableSubCommands}}
Available Commands:{{range .Commands}}{{if .IsAvailableCommand}}
{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{ if .HasAvailableLocalFlags}}
Flags:
{{.LocalFlags.FlagUsages | trimRightSpace}}{{end}}{{ if .HasAvailableInheritedFlags}}
Global Flags:
{{.InheritedFlags.FlagUsages | trimRightSpace}}{{end}}{{if .HasHelpSubCommands}}
Additional help topics:{{range .Commands}}{{if .IsHelpCommand}}
{{rpad .CommandPath .CommandPathPadding}} {{.Short}}{{end}}{{end}}{{end}}{{ if .HasAvailableSubCommands }}
Use "{{.CommandPath}} [command] --help" for more information about a command.{{end}}
`)
}

// Config is the struct containing all global flags
Expand All @@ -273,7 +312,7 @@ type Config struct {
Continue bool
DryRun bool

BufferSize int
// BufferSize int

AssignMap map[string]string
Trim string
Expand All @@ -298,8 +337,16 @@ func getConfigs(cmd *cobra.Command) Config {
assignStrs := getFlagStringSlice(cmd, "assign")
assignMap := make(map[string]string)
for _, s := range assignStrs {
found := reAssign.FindStringSubmatch(s)
assignMap[found[1]] = found[2]
if reAssign.MatchString(s) {
found := reAssign.FindStringSubmatch(s)
switch found[1] {
case ".", ":", "/", "%", "#":
checkError(fmt.Errorf(`"var" in --v/--assign var=val should not be ".", ":", "/", "%%" or "#", given: "%s"`, found[1]))
}
assignMap[found[1]] = found[2]
} else {
checkError(fmt.Errorf(`illegal value for flag -v/--assign (format: "var=value", e.g., "-v 'a=a bc'"): %s`, s))
}
}

return Config{
Expand All @@ -324,7 +371,7 @@ func getConfigs(cmd *cobra.Command) Config {
Continue: getFlagBool(cmd, "continue"),
DryRun: getFlagBool(cmd, "dry-run"),

BufferSize: getFlagPositiveInt(cmd, "buffer-size") * 1048576,
// BufferSize: getFlagPositiveInt(cmd, "buffer-size") * 1048576,

Trim: trim,
AssignMap: assignMap,
Expand Down
2 changes: 1 addition & 1 deletion rush/cmd/string_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var reCharsCheck = regexp.MustCompile(`^(\d+)*[^\d]*$`)
func fillCommand(config Config, command string, chunk Chunk) string {
founds := rePlaceHolder.FindAllStringSubmatchIndex(command, -1)
if len(founds) == 0 {
return command + "\n"
return command
}
fieldsStr := strings.Join(chunk.Data, config.RecordDelimiter)
switch config.Trim {
Expand Down
Loading

0 comments on commit 570cd8d

Please sign in to comment.