Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup large dumps with STDIN #35

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ jobs:
- name: install redis-cli
run: sudo apt-get install redis-tools

- name: Update postgres tools
run: |
sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install -y postgresql-client

- name: Test
run: go test -v ./...
env:
Expand Down
1 change: 0 additions & 1 deletion cmd/pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"

"github.com/mittwald/brudi/pkg/source/pgdump"

"github.com/spf13/cobra"
Expand Down
85 changes: 71 additions & 14 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"bufio"
"bytes"
"compress/gzip"
"context"
"fmt"
Expand All @@ -20,7 +21,7 @@ import (
const flagTag = "flag"
const gzipType = "application/x-gzip"

// includeFlag returns an string slice of [<flag>, <val>], or [<val>]
// includeFlag returns a string slice of [<flag>, <val>], or [<val>]
func includeFlag(flag, val string) []string {
var cmd []string
if flag != "" {
Expand Down Expand Up @@ -107,6 +108,9 @@ func StructToCLI(optionStruct interface{}) []string {
if flag == "-" {
continue
}
if fieldVal == nil {
continue
}

switch t := fieldVal.(type) {
case int:
Expand Down Expand Up @@ -188,33 +192,86 @@ func ParseCommandLine(cmd CommandType) []string {
}

// RunWithTimeout executes the given binary within a max execution time
func RunWithTimeout(runContext context.Context, cmd CommandType, timeout time.Duration) ([]byte, error) {
func RunWithTimeout(runContext context.Context, cmd *CommandType, outputToPipe bool, timeout time.Duration) ([]byte, error) {
ctx, cancel := context.WithTimeout(runContext, timeout)
defer cancel()

return Run(ctx, cmd)
return Run(ctx, cmd, outputToPipe)
}

// Run executes the given binary
func Run(ctx context.Context, cmd CommandType) ([]byte, error) {
var out []byte
var err error
commandLine := ParseCommandLine(cmd)
func Run(ctx context.Context, cmd *CommandType, outputToPipe bool) ([]byte, error) {
if outputToPipe && cmd.Pipe != nil {
return nil, errors.New("output is supposed to be used but Pipe is already not nil")
}
var cmdExec *exec.Cmd
var outBuffer bytes.Buffer
var stdin io.WriteCloser
var err, pipeErr error
commandLine := ParseCommandLine(*cmd)
log.WithField("command", strings.Join(commandLine, " ")).Debug("executing command")
if ctx != nil {
out, err = exec.CommandContext(ctx, commandLine[0], commandLine[1:]...).CombinedOutput() //nolint: gosec
if ctx.Err() != nil {
return out, fmt.Errorf("failed to execute command: timed out or canceled")
if ctx == nil {
ctx = context.Background()
}
cCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
cmdExec = exec.CommandContext(cCtx, commandLine[0], commandLine[1:]...) //nolint: gosec
if outputToPipe {
cmd.PipeReady.L.Lock()
cmd.Pipe, err = cmdExec.StdoutPipe()
cmd.PipeReady.L.Unlock()
if err != nil {
defer cmd.PipeReady.Broadcast()
return nil, errors.Wrapf(err, "error while getting STDOUT pipe for command: %s", strings.Join(commandLine, " "))
}
cmd.ReadingDone = make(chan bool, 1)
cmd.PipeReady.Broadcast()
} else {
out, err = exec.Command(commandLine[0], commandLine[1:]...).CombinedOutput() //nolint: gosec
cmdExec.Stdout = &outBuffer
}
cmdExec.Stderr = &outBuffer
if cmd.Pipe != nil {
stdin, err = cmdExec.StdinPipe()
if err != nil {
return nil, errors.Wrapf(err, "error while getting STDIN pipe for command: %s", strings.Join(commandLine, " "))
}
}

err = cmdExec.Start()
if err != nil {
return nil, errors.Wrapf(err, "error while getting starting restic command: %s", strings.Join(commandLine, " "))
}
if outputToPipe {
for done := range cmd.ReadingDone {
if done {
cancelFunc()
break
}
}
} else if cmd.Pipe != nil {
_, pipeErr = io.Copy(stdin, cmd.Pipe)
cmd.ReadingDone <- true
close(cmd.ReadingDone)
if pipeErr != nil {
cancelFunc()
}
_ = stdin.Close()
}
err = cmdExec.Wait()
cancelFunc()
if ctx != nil && ctx.Err() != nil {
return outBuffer.Bytes(), fmt.Errorf("failed to execute command: timed out or canceled")
}

if pipeErr != nil {
return outBuffer.Bytes(), fmt.Errorf("failed to pipe data into STDIN for command: %s", err)
}
if err != nil {
return out, fmt.Errorf("failed to execute command: %s", err)
return outBuffer.Bytes(), fmt.Errorf("failed to execute command: %s", err)
}

log.WithField("command", strings.Join(commandLine, " ")).Debug("successfully executed command")
return out, nil
return outBuffer.Bytes(), nil
}

// GzipFile compresses a file with gzip and returns the path of the created archive
Expand Down
19 changes: 14 additions & 5 deletions pkg/cli/types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package cli

import (
"io"
"sync"
)

const GzipSuffix = ".gz"
const DoStdinBackupKey = "doPipingBackup"

type CommandType struct {
Binary string
Command string
Args []string
Nice *int // https://linux.die.net/man/1/nice
IONice *int // https://linux.die.net/man/1/ionice
Binary string
Command string
Args []string
Pipe io.Reader // TODO: Remove when --stdin-command was added to restic
PipeReady *sync.Cond // TODO: Remove when --stdin-command was added to restic
ReadingDone chan bool // TODO: Remove when --stdin-command was added to restic
Nice *int // https://linux.die.net/man/1/nice
IONice *int // https://linux.die.net/man/1/ionice
}

type PipedCommandsPids struct {
Expand Down
9 changes: 7 additions & 2 deletions pkg/restic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package restic
import (
"context"
"fmt"
"github.com/mittwald/brudi/pkg/cli"
"github.com/spf13/viper"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -36,6 +38,9 @@ func NewResticClient(logger *log.Entry, hostname string, backupPaths ...string)
if err != nil {
return nil, errors.WithStack(err)
}
if viper.GetBool(cli.DoStdinBackupKey) {
conf.Backup.Paths = nil
}

if (conf.Backup.Flags.Host) == "" {
conf.Backup.Flags.Host = hostname
Expand All @@ -51,7 +56,7 @@ func NewResticClient(logger *log.Entry, hostname string, backupPaths ...string)
}, nil
}

func (c *Client) DoResticBackup(ctx context.Context) error {
func (c *Client) DoResticBackup(ctx context.Context, backupDataCmd *cli.CommandType) error {
c.Logger.Info("running 'restic backup'")

_, err := initBackup(ctx, c.Config.Global)
Expand All @@ -64,7 +69,7 @@ func (c *Client) DoResticBackup(ctx context.Context) error {
}

var out []byte
_, out, err = CreateBackup(ctx, c.Config.Global, c.Config.Backup, true)
_, out, err = CreateBackup(ctx, c.Config.Global, c.Config.Backup, true, backupDataCmd)
if err != nil {
return errors.WithStack(fmt.Errorf("error while running restic backup: %s - %s", err.Error(), out))
}
Expand Down
47 changes: 33 additions & 14 deletions pkg/restic/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"path"
"strings"
"time"

Expand Down Expand Up @@ -31,7 +32,7 @@ var (
func initBackup(ctx context.Context, globalOpts *GlobalOptions) ([]byte, error) {
cmd := newCommand("init", cli.StructToCLI(globalOpts)...)

out, err := cli.RunWithTimeout(ctx, cmd, cmdTimeout)
out, err := cli.RunWithTimeout(ctx, &cmd, false, cmdTimeout)
if err != nil {
// s3 init-check
if strings.Contains(string(out), "config already initialized") {
Expand Down Expand Up @@ -80,7 +81,7 @@ func parseSnapshotOut(jsonLog []byte) (BackupResult, error) {
}

// CreateBackup executes "restic backup" and returns the parent snapshot id (if available) and the snapshot id
func CreateBackup(ctx context.Context, globalOpts *GlobalOptions, backupOpts *BackupOptions, unlock bool) (BackupResult, []byte, error) {
func CreateBackup(ctx context.Context, globalOpts *GlobalOptions, backupOpts *BackupOptions, unlock bool, backupDataCmd *cli.CommandType) (BackupResult, []byte, error) {
var out []byte
var err error

Expand All @@ -98,11 +99,29 @@ func CreateBackup(ctx context.Context, globalOpts *GlobalOptions, backupOpts *Ba

var args []string
args = cli.StructToCLI(globalOpts)
if backupDataCmd != nil {
backupOpts.Paths = nil
if backupOpts.Flags.StdinFilename == "" {
binName := path.Base(backupDataCmd.Binary)
if binName == "." || binName == "/" {
backupOpts.Flags.StdinFilename = "stdin-backup-file"
} else {
backupOpts.Flags.StdinFilename = fmt.Sprintf("%s-file", binName)
}
}
// TODO: Change back when --stdin-command was added to restic
// args = append(args, "--stdin-from-command", strings.Join(cli.ParseCommandLine(*backupDataCmd), " "))
backupOpts.Flags.Stdin = true
}
args = append(args, cli.StructToCLI(backupOpts)...)

cmd := newCommand("backup", args...)
if backupDataCmd != nil {
cmd.Pipe = backupDataCmd.Pipe
cmd.ReadingDone = backupDataCmd.ReadingDone
}

out, err = cli.RunWithTimeout(ctx, cmd, cmdTimeout)
out, err = cli.RunWithTimeout(ctx, &cmd, false, cmdTimeout)
if err != nil {
return BackupResult{}, out, err
}
Expand Down Expand Up @@ -141,7 +160,7 @@ func Ls(ctx context.Context, glob *GlobalOptions, opts *LsOptions) ([]LsResult,
args = append(args, cli.StructToCLI(opts)...)
cmd := newCommand("ls", args...)

out, err := cli.Run(ctx, cmd)
out, err := cli.Run(ctx, &cmd, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -226,7 +245,7 @@ func GetSnapshotSize(ctx context.Context, snapshotIDs []string) (size uint64) {
}
cmd := newCommand("stats", cli.StructToCLI(&opts)...)

out, err := cli.Run(ctx, cmd)
out, err := cli.Run(ctx, &cmd, false)
if err != nil {
return
}
Expand Down Expand Up @@ -267,7 +286,7 @@ func GetSnapshotSizeByPath(ctx context.Context, snapshotID, path string) (size u
func ListSnapshots(ctx context.Context, opts *SnapshotOptions) ([]Snapshot, error) {
cmd := newCommand("snapshots", cli.StructToCLI(&opts)...)

out, err := cli.Run(ctx, cmd)
out, err := cli.Run(ctx, &cmd, false)
if err != nil {
return nil, err
}
Expand All @@ -282,7 +301,7 @@ func ListSnapshots(ctx context.Context, opts *SnapshotOptions) ([]Snapshot, erro
// Find executes "restic find"
func Find(ctx context.Context, opts *FindOptions) ([]FindResult, error) {
cmd := newCommand("find", cli.StructToCLI(&opts)...)
out, err := cli.Run(ctx, cmd)
out, err := cli.Run(ctx, &cmd, false)
if err != nil {
return nil, err
}
Expand All @@ -298,7 +317,7 @@ func Find(ctx context.Context, opts *FindOptions) ([]FindResult, error) {
// Check executes "restic check"
func Check(ctx context.Context, flags *CheckFlags) ([]byte, error) {
cmd := newCommand("check", cli.StructToCLI(flags)...)
return cli.Run(ctx, cmd)
return cli.Run(ctx, &cmd, false)
}

// Forget executes "restic forget"
Expand All @@ -320,7 +339,7 @@ func Forget(
Args: args,
}

out, err := cli.Run(ctx, cmd)
out, err := cli.Run(ctx, &cmd, false)
if err != nil {
return nil, out, err
}
Expand Down Expand Up @@ -349,7 +368,7 @@ func Forget(
func Prune(ctx context.Context, globalOpts *GlobalOptions) ([]byte, error) {
cmd := newCommand("prune", cli.StructToCLI(globalOpts)...)

return cli.Run(ctx, cmd)
return cli.Run(ctx, &cmd, false)
}

// RebuildIndex executes "restic rebuild-index"
Expand All @@ -363,7 +382,7 @@ func RebuildIndex(ctx context.Context) ([]byte, error) {
Nice: &nice,
IONice: &ionice,
}
return cli.Run(ctx, cmd)
return cli.Run(ctx, &cmd, false)
}

// RestoreBackup executes "restic restore"
Expand All @@ -385,7 +404,7 @@ func RestoreBackup(ctx context.Context, glob *GlobalOptions, opts *RestoreOption

cmd := newCommand("restore", args...)

return cli.Run(ctx, cmd)
return cli.Run(ctx, &cmd, false)
}

// Unlock executes "restic unlock"
Expand All @@ -395,12 +414,12 @@ func Unlock(ctx context.Context, globalOpts *GlobalOptions, unlockOpts *UnlockOp
args = append(args, cli.StructToCLI(unlockOpts)...)
cmd := newCommand("unlock", args...)

return cli.Run(ctx, cmd)
return cli.Run(ctx, &cmd, false)
}

// Tag executes "restic tag"
func Tag(ctx context.Context, opts *TagOptions) ([]byte, error) {
cmd := newCommand("tag", cli.StructToCLI(opts)...)

return cli.Run(ctx, cmd)
return cli.Run(ctx, &cmd, false)
}
Loading
Loading