Skip to content

Commit

Permalink
br: add a new CRD for compact backup (#5822)
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR authored Jan 13, 2025
1 parent bcf2c89 commit b80e1f9
Show file tree
Hide file tree
Showing 42 changed files with 15,927 additions and 8,243 deletions.
4 changes: 2 additions & 2 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d

var errs []error

backupFullPath, err := util.GetStoragePath(backup)
backupFullPath, err := util.GetStoragePath(&backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand Down Expand Up @@ -506,7 +506,7 @@ func (bm *Manager) performLogBackup(ctx context.Context, backup *v1alpha1.Backup
// startLogBackup starts log backup.
func (bm *Manager) startLogBackup(ctx context.Context, backup *v1alpha1.Backup) (*controller.BackupUpdateStatus, string, error) {
started := time.Now()
backupFullPath, err := util.GetStoragePath(backup)
backupFullPath, err := util.GetStoragePath(&backup.Spec.StorageProvider)
if err != nil {
klog.Errorf("Get backup full path of cluster %s failed, err: %s", bm, err)
return nil, "GetBackupRemotePathFailed", err
Expand Down
5 changes: 2 additions & 3 deletions cmd/backup-manager/app/clean/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"github.com/agiledragon/gomonkey/v2"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
. "github.com/onsi/gomega"
"gocloud.dev/blob"
"gocloud.dev/blob/driver"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/util"
"gocloud.dev/blob"
"gocloud.dev/blob/driver"
)

func TestCleanBRRemoteBackupDataOnce(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cmd/backup-manager/app/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewBackupMgrCommand() *cobra.Command {
cmds.AddCommand(NewRestoreCommand())
cmds.AddCommand(NewImportCommand())
cmds.AddCommand(NewCleanCommand())
cmds.AddCommand(NewCompactCommand())
return cmds
}

Expand Down
70 changes: 70 additions & 0 deletions cmd/backup-manager/app/cmd/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact/options"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/cache"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

func NewCompactCommand() *cobra.Command {
opts := options.CompactOpts{}

cmd := &cobra.Command{
Use: "compact",
Short: "Compact log backup.",
Run: func(cmd *cobra.Command, args []string) {
util.ValidCmdFlags(cmd.CommandPath(), cmd.LocalFlags())
cmdutil.CheckErr(runCompact(opts, kubecfg))
},
}

cmd.Flags().StringVar(&opts.Namespace, "namespace", "", "Backup CR's namespace")
cmd.Flags().StringVar(&opts.ResourceName, "resourceName", "", "Backup CRD object name")
return cmd
}

func runCompact(compactOpts options.CompactOpts, kubecfg string) error {
kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg)
if err != nil {
return err
}
options := []informers.SharedInformerOption{
informers.WithNamespace(compactOpts.Namespace),
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cli, constants.ResyncDuration, options...)
recorder := util.NewEventRecorder(kubeCli, "compact-manager")
compactInformer := informerFactory.Pingcap().V1alpha1().CompactBackups()
statusUpdater := controller.NewCompactStatusUpdater(recorder, compactInformer.Lister(), cli)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go informerFactory.Start(ctx.Done())

// waiting for the shared informer's store has synced.
cache.WaitForCacheSync(ctx.Done(), compactInformer.Informer().HasSynced)

// klog.Infof("start to process backup %s", compactOpts.String())
cm := compact.NewManager(compactInformer.Lister(), statusUpdater, compactOpts)
return cm.ProcessCompact()
}
229 changes: 229 additions & 0 deletions cmd/backup-manager/app/compact/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package compact

import (
"bytes"
"context"
"encoding/json"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact/options"
backuputil "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
pkgutil "github.com/pingcap/tidb-operator/pkg/backup/util"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/util"
"k8s.io/klog/v2"
)

const (
messageCompactionDone = "Finishing compaction."
messageCompactAborted = "Compaction aborted."
)

// logLine is line of JSON log.
// It just extracted the message from the JSON and keeps the origin json bytes.
// So you may extract fields from it by `json.Unmarshal(l.Raw, ...)`.
type logLine struct {
Message string `json:"Message"`
Raw json.RawMessage `json:"-"`
}

// Manager mainly used to manage backup related work
type Manager struct {
compact *v1alpha1.CompactBackup
resourceLister listers.CompactBackupLister
statusUpdater controller.CompactStatusUpdaterInterface
options options.CompactOpts
}

// NewManager return a Manager
func NewManager(
lister listers.CompactBackupLister,
statusUpdater controller.CompactStatusUpdaterInterface,
compactOpts options.CompactOpts) *Manager {
compact, err := lister.CompactBackups(compactOpts.Namespace).Get(compactOpts.ResourceName)
if err != nil {
klog.Errorf("can't find compact %s:%s CRD object, err: %v", compactOpts.Namespace, compactOpts.ResourceName, err)
return nil
}
return &Manager{
compact,
lister,
statusUpdater,
compactOpts,
}
}

func (cm *Manager) brBin() string {
return filepath.Join(util.BRBinPath, "br")
}

func (cm *Manager) kvCtlBin() string {
return filepath.Join(util.KVCTLBinPath, "tikv-ctl")
}

// ProcessBackup used to process the backup logic
func (cm *Manager) ProcessCompact() error {
var err error
ctx, cancel := backuputil.GetContextForTerminationSignals(cm.options.ResourceName)
defer cancel()

compact, err := cm.resourceLister.CompactBackups(cm.options.Namespace).Get(cm.options.ResourceName)
defer func() {
cm.statusUpdater.OnFinish(ctx, cm.compact, err)
}()
if err != nil {
return errors.New("backup not found")
}
if err = options.ParseCompactOptions(compact, &cm.options); err != nil {
return errors.Annotate(err, "failed to parse compact options")
}

b64, err := cm.base64ifyStorage(ctx)
if err != nil {
return errors.Annotate(err, "failed to base64ify storage")
}
return cm.runCompaction(ctx, b64)
}

func (cm *Manager) base64ifyStorage(ctx context.Context) (string, error) {
brCmd, err := cm.base64ifyCmd(ctx)
if err != nil {
return "", err
}
out, err := brCmd.Output()
if err != nil {
eerr, ok := err.(*exec.ExitError)
if !ok {
return "", errors.Annotatef(err, "failed to execute BR with args %v", brCmd.Args)
}
klog.Warningf("Failed to execute base64ify; stderr = %s", string(eerr.Stderr))
return "", errors.Annotatef(err, "failed to execute BR with args %v", brCmd.Args)
}
out = bytes.Trim(out, "\r\n \t")
return string(out), nil
}

func (cm *Manager) base64ifyCmd(ctx context.Context) (*exec.Cmd, error) {
br := cm.brBin()
args := []string{
"operator",
"base64ify",
}
StorageOpts, err := pkgutil.GenStorageArgsForFlag(cm.compact.Spec.StorageProvider, "storage")
if err != nil {
return nil, err
}
args = append(args, StorageOpts...)
return exec.CommandContext(ctx, br, args...), nil
}

func (cm *Manager) runCompaction(ctx context.Context, base64Storage string) (err error) {
cmd := cm.compactCmd(ctx, base64Storage)

// tikvLog is used to capture the log from tikv-ctl, which is sent to stderr by default
tikvLog, err := cmd.StderrPipe()
if err != nil {
return errors.Annotate(err, "failed to create stderr pipe for compact")
}
if err := cmd.Start(); err != nil {
return errors.Annotate(err, "failed to start compact")
}

cm.statusUpdater.OnStart(ctx, cm.compact)
err = cm.processCompactionLogs(ctx, io.TeeReader(tikvLog, os.Stdout))
if err != nil {
return err
}

return cmd.Wait()
}

func (cm *Manager) compactCmd(ctx context.Context, base64Storage string) *exec.Cmd {
ctl := cm.kvCtlBin()
// You should not change the log configuration here, it should sync with the upstream setting
args := []string{
"--log-level",
"INFO",
"--log-format",
"json",
"compact-log-backup",
"--storage-base64",
base64Storage,
"--from",
strconv.FormatUint(cm.options.FromTS, 10),
"--until",
strconv.FormatUint(cm.options.UntilTS, 10),
"-N",
strconv.FormatUint(cm.options.Concurrency, 10),
}
return exec.CommandContext(ctx, ctl, args...)
}

func (cm *Manager) processCompactionLogs(ctx context.Context, logStream io.Reader) error {
dec := json.NewDecoder(logStream)

for dec.More() {
if ctx.Err() != nil {
return ctx.Err()
}

var raw json.RawMessage
if err := dec.Decode(&raw); err != nil {
return errors.Annotate(err, "failed to decode raw log line")
}

var line logLine
if err := json.Unmarshal(raw, &line); err != nil {
return errors.Annotate(err, "failed to decode the line of log")
}
line.Raw = raw

if err := cm.processLogLine(ctx, line); err != nil {
return err
}
}

return nil
}

func (cm *Manager) processLogLine(ctx context.Context, l logLine) error {
switch l.Message {
case messageCompactionDone:
var prog controller.Progress
if err := json.Unmarshal(l.Raw, &prog); err != nil {
return errors.Annotatef(err, "failed to decode progress message: %s", string(l.Raw))
}
cm.statusUpdater.OnProgress(ctx, cm.compact, prog)
return nil
case messageCompactAborted:
errContainer := struct {
Err string `json:"err"`
}{}
if err := json.Unmarshal(l.Raw, &errContainer); err != nil {
return errors.Annotatef(err, "failed to decode error message: %s", string(l.Raw))
}
return errors.New(errContainer.Err)
default:
return nil
}
}
Loading

0 comments on commit b80e1f9

Please sign in to comment.