From fa017fba30c1dac4f04f53b0d9a4c78ac3fcc455 Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Mon, 3 Apr 2023 20:57:11 +0530 Subject: [PATCH 1/7] Fix cleanup of temporary directory used for restoration --- .ci/unit_test | 1 + pkg/compactor/compactor.go | 44 +++--- pkg/compactor/compactor_suite_test.go | 26 ++-- pkg/compactor/compactor_test.go | 54 ++++--- pkg/initializer/initializer.go | 20 +-- pkg/initializer/validator/datavalidator.go | 2 +- pkg/miscellaneous/miscellaneous.go | 2 +- pkg/snapshot/restorer/restorer.go | 149 ++++++++++++------- pkg/snapshot/restorer/restorer_suite_test.go | 18 ++- pkg/snapshot/restorer/restorer_test.go | 142 +++++++++--------- pkg/snapshot/restorer/types_test.go | 3 +- pkg/types/restorer.go | 14 +- 12 files changed, 274 insertions(+), 201 deletions(-) diff --git a/.ci/unit_test b/.ci/unit_test index 854da2f67..31d30caf8 100755 --- a/.ci/unit_test +++ b/.ci/unit_test @@ -71,6 +71,7 @@ if [ -z $COVER ] || [ "$COVER" = false ] ; then ginkgo -race -trace $GINKGO_COMMON_FLAGS -gcflags=all=-d=checkptr=0 --randomizeAllSpecs --randomizeSuites --failOnPending --skip="NEGATIVE\:.*" ${TEST_PACKAGES} if [ "$RUN_NEGATIVE" = "true" ] ; then + echo "[INFO] Running negative tests now..." #run negative scenario in a sequenced manner (removed failOnPending as one spec in restore test is marked as 'X' for excluding) ginkgo -race -trace $GINKGO_COMMON_FLAGS -gcflags=all=-d=checkptr=0 --focus="NEGATIVE\:.*" ${TEST_PACKAGES} fi diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 1df526ee5..fd3104bc5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -55,40 +55,38 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions) cp.logger.Info("Start compacting") // Deepcopy restoration options ro to avoid any mutation of the passing object - cmpctOptions := opts.RestoreOptions.DeepCopy() + compactorRestoreOptions := opts.RestoreOptions.DeepCopy() - // If no basesnapshot is found, abort compaction as there would be nothing to compact - if cmpctOptions.BaseSnapshot == nil { + // If no base snapshot is found, abort compaction as there would be nothing to compact + if compactorRestoreOptions.BaseSnapshot == nil { cp.logger.Error("No base snapshot found. Nothing is available for compaction") return nil, fmt.Errorf("no base snapshot found. Nothing is available for compaction") } - // Set a temporary etcd data directory for embedded etcd - prefix := cmpctOptions.Config.RestoreDataDir - if prefix == "" { - prefix = "/tmp" - } - cmpctDir, err := os.MkdirTemp(prefix, "compactor-") + cp.logger.Infof("Creating temporary etcd directory %s for restoration.", compactorRestoreOptions.Config.DataDir) + err := os.MkdirAll(compactorRestoreOptions.Config.DataDir, 0700) if err != nil { cp.logger.Errorf("Unable to create temporary etcd directory for compaction: %s", err.Error()) return nil, err } - defer os.RemoveAll(cmpctDir) - - cmpctOptions.Config.RestoreDataDir = cmpctDir + defer func() { + if err := os.RemoveAll(compactorRestoreOptions.Config.DataDir); err != nil { + cp.logger.Errorf("Failed to remove temporary etcd directory %s: %v", compactorRestoreOptions.Config.DataDir, err) + } + }() // Then restore from the snapshots r := restorer.NewRestorer(cp.store, cp.logger) - embeddedEtcd, err := r.Restore(*cmpctOptions, nil) + embeddedEtcd, err := r.Restore(*compactorRestoreOptions, nil) if err != nil { return nil, fmt.Errorf("unable to restore snapshots during compaction: %v", err) } - cp.logger.Info("Restoration for compaction is over") + cp.logger.Info("Restoration for compaction is done.") // There is a possibility that restore operation may not start an embedded ETCD. if embeddedEtcd == nil { - embeddedEtcd, err = miscellaneous.StartEmbeddedEtcd(cp.logger, cmpctOptions) + embeddedEtcd, err = miscellaneous.StartEmbeddedEtcd(cp.logger, compactorRestoreOptions) if err != nil { return nil, err } @@ -104,8 +102,8 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions) // Then compact ETCD // Build Client - clientFactory := etcdutil.NewClientFactory(cmpctOptions.NewClientFactory, brtypes.EtcdConnectionConfig{ - MaxCallSendMsgSize: cmpctOptions.Config.MaxCallSendMsgSize, + clientFactory := etcdutil.NewClientFactory(compactorRestoreOptions.NewClientFactory, brtypes.EtcdConnectionConfig{ + MaxCallSendMsgSize: compactorRestoreOptions.Config.MaxCallSendMsgSize, Endpoints: ep, InsecureTransport: true, }) @@ -150,22 +148,22 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions) } } - // Then take snapeshot of ETCD + // Then take snapshot of ETCD snapshotReqCtx, cancel := context.WithTimeout(ctx, opts.SnapshotTimeout.Duration) defer cancel() // Determine suffix of compacted snapshot that will be result of this compaction - suffix := cmpctOptions.BaseSnapshot.CompressionSuffix - if len(cmpctOptions.DeltaSnapList) > 0 { - suffix = cmpctOptions.DeltaSnapList[cmpctOptions.DeltaSnapList.Len()-1].CompressionSuffix + suffix := compactorRestoreOptions.BaseSnapshot.CompressionSuffix + if len(compactorRestoreOptions.DeltaSnapList) > 0 { + suffix = compactorRestoreOptions.DeltaSnapList[compactorRestoreOptions.DeltaSnapList.Len()-1].CompressionSuffix } isCompressed, compressionPolicy, err := compressor.IsSnapshotCompressed(suffix) if err != nil { - return nil, fmt.Errorf("unable to determine if snapshot is compressed: %v", cmpctOptions.BaseSnapshot.CompressionSuffix) + return nil, fmt.Errorf("unable to determine if snapshot is compressed: %v", compactorRestoreOptions.BaseSnapshot.CompressionSuffix) } - isFinal := cmpctOptions.BaseSnapshot.IsFinal + isFinal := compactorRestoreOptions.BaseSnapshot.IsFinal cc := &compressor.CompressionConfig{Enabled: isCompressed, CompressionPolicy: compressionPolicy} snapshot, err := etcdutil.TakeAndSaveFullSnapshot(snapshotReqCtx, clientMaintenance, cp.store, etcdRevision, cc, suffix, isFinal, cp.logger) diff --git a/pkg/compactor/compactor_suite_test.go b/pkg/compactor/compactor_suite_test.go index 523635bca..a387e46f3 100644 --- a/pkg/compactor/compactor_suite_test.go +++ b/pkg/compactor/compactor_suite_test.go @@ -36,13 +36,13 @@ const ( ) var ( - testSuitDir, testEtcdDir, testSnapshotDir string - testCtx = context.Background() - logger = logrus.New().WithField("suite", "compactor") - etcd *embed.Etcd - err error - keyTo int - endpoints []string + testSuiteDir, testEtcdDir, testSnapshotDir string + testCtx = context.Background() + logger = logrus.New().WithField("suite", "compactor") + etcd *embed.Etcd + err error + keyTo int + endpoints []string ) func TestCompactor(t *testing.T) { @@ -56,13 +56,13 @@ var _ = SynchronizedBeforeSuite(func() []byte { ) // Create a directory for compaction test cases - testSuitDir, err = os.MkdirTemp("/tmp", "compactor-test") + testSuiteDir, err = os.MkdirTemp("/tmp", "compactor-test-") Expect(err).ShouldNot(HaveOccurred()) // Directory for the main ETCD process - testEtcdDir := fmt.Sprintf("%s/etcd/default.etcd", testSuitDir) + testEtcdDir := fmt.Sprintf("%s/etcd/default.etcd", testSuiteDir) // Directory for storing the backups - testSnapshotDir := fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir) + testSnapshotDir := fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuiteDir) logger.Infof("ETCD Directory is: %s", testEtcdDir) logger.Infof("Snapshot Directory is: %s", testSnapshotDir) @@ -91,7 +91,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) - // Wait unitil the populator finishes with populating ETCD + // Wait until the populator finishes with populating ETCD wg.Wait() keyTo = resp.KeyTo @@ -106,7 +106,7 @@ func cleanUp() { etcd.Server.Stop() etcd.Close() - logger.Infof("All tests are done for compactor suite. %s is being removed.", testSuitDir) - err = os.RemoveAll(testSuitDir) + logger.Infof("All tests are done for compactor suite. %s is being removed.", testSuiteDir) + err = os.RemoveAll(testSuiteDir) Expect(err).ShouldNot(HaveOccurred()) } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 1e8cee848..2af498290 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -55,20 +55,27 @@ var _ = Describe("Running Compactor", func() { var compactorConfig *brtypes.CompactorConfig var compactOptions *brtypes.CompactOptions var compactedSnapshot *brtypes.Snapshot - var restoreDir string + var tempRestoreDir string BeforeEach(func() { - dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir) + dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuiteDir) store, err = snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: dir, Provider: "Local"}) Expect(err).ShouldNot(HaveOccurred()) fmt.Println("The store where compaction will save snapshot is: ", store) + tempDataDir, err := os.MkdirTemp(testSuiteDir, "compacted.etcd-") + Expect(err).ShouldNot(HaveOccurred()) + + tempRestorationSnapshotsDir, err := os.MkdirTemp(testSuiteDir, "temp-snapshots-") + Expect(err).ShouldNot(HaveOccurred()) + cptr = compactor.NewCompactor(store, logger, nil) restoreOpts = &brtypes.RestoreOptions{ Config: &brtypes.RestorationConfig{ InitialCluster: restoreCluster, InitialClusterToken: restoreClusterToken, - RestoreDataDir: "/tmp", + DataDir: tempDataDir, + TempDir: tempRestorationSnapshotsDir, InitialAdvertisePeerURLs: restorePeerURLs, Name: restoreName, SkipHashCheck: skipHashCheck, @@ -93,11 +100,11 @@ var _ = Describe("Running Compactor", func() { } }) - Context("with defragmention allowed", func() { + Context("with defragmentation allowed", func() { AfterEach(func() { - _, err := os.Stat(restoreDir) + _, err := os.Stat(tempRestoreDir) if err == nil { - os.RemoveAll(restoreDir) + os.RemoveAll(tempRestoreDir) } store.Delete(*compactedSnapshot) }) @@ -105,14 +112,14 @@ var _ = Describe("Running Compactor", func() { It("should create a snapshot", func() { restoreOpts.Config.MaxFetchers = 4 - // Fetch latest snapshots + // Fetch the latest set of snapshots baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) restoreOpts.BaseSnapshot = baseSnapshot restoreOpts.DeltaSnapList = deltaSnapList - // Take the compacted full snapshot with defragmnetation allowed + // Take the compacted full snapshot with defragmentation allowed _, err = cptr.Compact(testCtx, compactOptions) Expect(err).ShouldNot(HaveOccurred()) @@ -130,7 +137,7 @@ var _ = Describe("Running Compactor", func() { It("should restore from compacted snapshot", func() { restoreOpts.Config.MaxFetchers = 4 - // Fetch latest snapshots + // Fetch the latest set of snapshots baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) @@ -151,10 +158,15 @@ var _ = Describe("Running Compactor", func() { Expect(size).ShouldNot(BeZero()) // Restore from the compacted snapshot - restoreDir, err = os.MkdirTemp("/tmp", "restore-") + tempRestoreDir, err = os.MkdirTemp(testSuiteDir, "restore-test-") Expect(err).ShouldNot(HaveOccurred()) - restoreOpts.Config.RestoreDataDir = restoreDir + defer func() { + err := os.RemoveAll(tempRestoreDir) + Expect(err).ShouldNot(HaveOccurred()) + }() + + restoreOpts.Config.DataDir = tempRestoreDir restoreOpts.BaseSnapshot = compactedSnapshot restoreOpts.DeltaSnapList = deltaSnapList @@ -164,22 +176,22 @@ var _ = Describe("Running Compactor", func() { err = rstr.RestoreAndStopEtcd(*restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) - Context("with defragmention not allowed", func() { + Context("with defragmentation not allowed", func() { AfterEach(func() { - _, err := os.Stat(restoreDir) + _, err := os.Stat(tempRestoreDir) if err != nil { - os.RemoveAll(restoreDir) + os.RemoveAll(tempRestoreDir) } store.Delete(*compactedSnapshot) }) It("should create a snapshot", func() { restoreOpts.Config.MaxFetchers = 4 - // Fetch latest snapshots + // Fetch the latest set of snapshots baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) @@ -205,7 +217,7 @@ var _ = Describe("Running Compactor", func() { It("should restore from compacted snapshot", func() { restoreOpts.Config.MaxFetchers = 4 - // Fetch latest snapshots + // Fetch the latest set of snapshots baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) @@ -227,10 +239,10 @@ var _ = Describe("Running Compactor", func() { Expect(size).ShouldNot(BeZero()) // Restore from the compacted snapshot - restoreDir, err = os.MkdirTemp("/tmp", "restore-") + tempRestoreDir, err = os.MkdirTemp(testSuiteDir, "restore-test-") Expect(err).ShouldNot(HaveOccurred()) - restoreOpts.Config.RestoreDataDir = restoreDir + restoreOpts.Config.DataDir = tempRestoreDir restoreOpts.BaseSnapshot = compactedSnapshot restoreOpts.DeltaSnapList = deltaSnapList @@ -240,11 +252,11 @@ var _ = Describe("Running Compactor", func() { err = rstr.RestoreAndStopEtcd(*restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) - Context("with no basesnapshot in backup store", func() { + Context("with no base snapshot in backup store", func() { It("should not run compaction", func() { restoreOpts.Config.MaxFetchers = 4 diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index 924bc88b8..9469028ae 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -114,21 +114,21 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6 } // NewInitializer creates an etcd initializer object. -func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.SnapstoreConfig, etcdConnectionConfig *brtypes.EtcdConnectionConfig, logger *logrus.Logger) *EtcdInitializer { +func NewInitializer(restoreOptions *brtypes.RestoreOptions, snapstoreConfig *brtypes.SnapstoreConfig, etcdConnectionConfig *brtypes.EtcdConnectionConfig, logger *logrus.Logger) *EtcdInitializer { zapLogger, _ := zap.NewProduction() etcdInit := &EtcdInitializer{ Config: &Config{ SnapstoreConfig: snapstoreConfig, - RestoreOptions: options, + RestoreOptions: restoreOptions, EtcdConnectionConfig: etcdConnectionConfig, }, Validator: &validator.DataValidator{ Config: &validator.Config{ - DataDir: options.Config.RestoreDataDir, - EmbeddedEtcdQuotaBytes: options.Config.EmbeddedEtcdQuotaBytes, + DataDir: restoreOptions.Config.DataDir, + EmbeddedEtcdQuotaBytes: restoreOptions.Config.EmbeddedEtcdQuotaBytes, SnapstoreConfig: snapstoreConfig, }, - OriginalClusterSize: options.OriginalClusterSize, + OriginalClusterSize: restoreOptions.OriginalClusterSize, Logger: logger, ZapLogger: zapLogger, }, @@ -144,7 +144,7 @@ func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.Sn func (e *EtcdInitializer) restoreCorruptData() (bool, error) { logger := e.Logger tempRestoreOptions := *(e.Config.RestoreOptions.DeepCopy()) - dataDir := tempRestoreOptions.Config.RestoreDataDir + dataDir := tempRestoreOptions.Config.DataDir if e.Config.SnapstoreConfig == nil || len(e.Config.SnapstoreConfig.Provider) == 0 { logger.Warnf("No snapstore storage provider configured.") @@ -170,9 +170,9 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) { tempRestoreOptions.BaseSnapshot = baseSnap tempRestoreOptions.DeltaSnapList = deltaSnapList - tempRestoreOptions.Config.RestoreDataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.RestoreDataDir, "part") + tempRestoreOptions.Config.DataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.DataDir, "part") - if err := e.removeDir(tempRestoreOptions.Config.RestoreDataDir); err != nil { + if err := e.removeDir(tempRestoreOptions.Config.DataDir); err != nil { return false, fmt.Errorf("failed to delete previous temporary data directory: %v", err) } @@ -196,7 +196,7 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) { // and false if directory removal failed or if directory // never existed (bootstrap case) func (e *EtcdInitializer) restoreWithEmptySnapstore() (bool, error) { - dataDir := e.Config.RestoreOptions.Config.RestoreDataDir + dataDir := e.Config.RestoreOptions.Config.DataDir e.Logger.Infof("Removing directory(%s) since snapstore is empty.", dataDir) // If data directory doesn't exist, it means we are bootstrapping @@ -248,7 +248,7 @@ func (e *EtcdInitializer) restoreInMultiNode(ctx context.Context) error { return fmt.Errorf("unable to remove the member %v", err) } - if err := e.removeDir(e.Config.RestoreOptions.Config.RestoreDataDir); err != nil { + if err := e.removeDir(e.Config.RestoreOptions.Config.DataDir); err != nil { return fmt.Errorf("unable to remove the data-dir %v", err) } diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index 7ca0fba8c..9e654bbb6 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -365,7 +365,7 @@ func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnaps d.Logger.Info("Starting embedded etcd server...") ro := &brtypes.RestoreOptions{ Config: &brtypes.RestorationConfig{ - RestoreDataDir: dataDir, + DataDir: dataDir, EmbeddedEtcdQuotaBytes: d.Config.EmbeddedEtcdQuotaBytes, MaxRequestBytes: defaultMaxRequestBytes, MaxTxnOps: defaultMaxTxnOps, diff --git a/pkg/miscellaneous/miscellaneous.go b/pkg/miscellaneous/miscellaneous.go index a3cb5c1b8..49d0e652e 100644 --- a/pkg/miscellaneous/miscellaneous.go +++ b/pkg/miscellaneous/miscellaneous.go @@ -152,7 +152,7 @@ func getStructuredBackupList(snapList brtypes.SnapList) []backup { // StartEmbeddedEtcd starts the embedded etcd server. func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*embed.Etcd, error) { cfg := embed.NewConfig() - cfg.Dir = filepath.Join(ro.Config.RestoreDataDir) + cfg.Dir = filepath.Join(ro.Config.DataDir) DefaultListenPeerURLs := "http://localhost:0" DefaultListenClientURLs := "http://localhost:0" DefaultInitialAdvertisePeerURLs := "http://localhost:0" diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 4a34d659e..a15da890a 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -26,6 +26,7 @@ import ( "path" "path/filepath" "reflect" + "strings" "sync" "time" @@ -56,11 +57,6 @@ import ( "go.uber.org/zap" ) -const ( - tmpDir = "/tmp" - tmpEventsDataFilePrefix = "etcd-restore-" -) - // Restorer is a struct for etcd data directory restorer type Restorer struct { logger *logrus.Entry @@ -90,15 +86,31 @@ func (r *Restorer) RestoreAndStopEtcd(ro brtypes.RestoreOptions, m member.Contro return err } -// Restore restore the etcd data directory as per specified restore options but returns the ETCD server that it statrted. +// Restore restores the etcd data directory as per specified restore options but returns the ETCD server that it statrted. func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed.Etcd, error) { if err := r.restoreFromBaseSnapshot(ro); err != nil { return nil, fmt.Errorf("failed to restore from the base snapshot :%v", err) } + if len(ro.DeltaSnapList) == 0 { r.logger.Infof("No delta snapshots present over base snapshot.") return nil, nil } + r.logger.Infof("Attempting to apply %d delta snapshots for restoration.", len(ro.DeltaSnapList)) + + r.logger.Infof("Creating temporary directory for persisting delta snapshots locally.") + + err := os.MkdirAll(ro.Config.TempDir, 0700) + if err != nil { + return nil, err + } + + defer func() { + if err := os.RemoveAll(ro.Config.TempDir); err != nil { + r.logger.Errorf("Failed to remove restoration temp directory %s: %v", ro.Config.TempDir, err) + } + }() + r.logger.Infof("Starting embedded etcd server...") e, err := miscellaneous.StartEmbeddedEtcd(r.logger, &ro) if err != nil { @@ -114,7 +126,11 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. if err != nil { return e, err } - defer clientKV.Close() + defer func() { + if err := clientKV.Close(); err != nil { + r.logger.Errorf("Failed to close etcd KV client: %v", err) + } + }() r.logger.Infof("Applying delta snapshots...") if err := r.applyDeltaSnapshots(clientKV, ro); err != nil { @@ -155,21 +171,21 @@ func (r *Restorer) restoreFromBaseSnapshot(ro brtypes.RestoreOptions) error { return err } - memberDir := filepath.Join(ro.Config.RestoreDataDir, "member") + memberDir := filepath.Join(ro.Config.DataDir, "member") if _, err := os.Stat(memberDir); err == nil { return fmt.Errorf("member directory in data directory(%q) exists", memberDir) } walDir := filepath.Join(memberDir, "wal") - snapdir := filepath.Join(memberDir, "snap") - if err = r.makeDB(snapdir, ro.BaseSnapshot, len(cl.Members()), ro.Config.SkipHashCheck); err != nil { + snapDir := filepath.Join(memberDir, "snap") + if err = r.makeDB(snapDir, ro.BaseSnapshot, len(cl.Members()), ro.Config.SkipHashCheck); err != nil { return err } - return makeWALAndSnap(r.zapLogger, walDir, snapdir, cl, ro.Config.Name) + return makeWALAndSnap(r.zapLogger, walDir, snapDir, cl, ro.Config.Name) } // makeDB copies the database snapshot to the snapshot directory. -func (r *Restorer) makeDB(snapdir string, snap *brtypes.Snapshot, commit int, skipHashCheck bool) error { +func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, skipHashCheck bool) error { rc, err := r.store.Fetch(*snap) if err != nil { return err @@ -189,11 +205,11 @@ func (r *Restorer) makeDB(snapdir string, snap *brtypes.Snapshot, commit int, sk } defer rc.Close() - if err := fileutil.CreateDirAll(snapdir); err != nil { + if err := fileutil.CreateDirAll(snapDir); err != nil { return err } - dbPath := filepath.Join(snapdir, "db") + dbPath := filepath.Join(snapDir, "db") db, err := os.OpenFile(dbPath, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err @@ -281,8 +297,8 @@ func (r *Restorer) makeDB(snapdir string, snap *brtypes.Snapshot, commit int, sk return nil } -func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.RaftCluster, restoreName string) error { - if err := fileutil.CreateDirAll(waldir); err != nil { +func makeWALAndSnap(logger *zap.Logger, walDir, snapDir string, cl *membership.RaftCluster, restoreName string) error { + if err := fileutil.CreateDirAll(walDir); err != nil { return err } @@ -300,7 +316,7 @@ func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.R return err } - w, err := wal.Create(logger, waldir, metadata) + w, err := wal.Create(logger, walDir, metadata) if err != nil { return err } @@ -360,7 +376,7 @@ func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.R }, }, } - snapshotter := snap.New(logger, snapdir) + snapshotter := snap.New(logger, snapDir) if err := snapshotter.SaveSnap(raftSnap); err != nil { panic(err) } @@ -402,12 +418,12 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest go r.applySnaps(clientKV, remainingSnaps, applierInfoCh, errCh, stopCh, &wg) for f := 0; f < numFetchers; f++ { - go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg) + go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg, ro.Config.TempDir) } - for i, snap := range remainingSnaps { + for i, remainingSnap := range remainingSnaps { fetcherInfo := brtypes.FetcherInfo{ - Snapshot: *snap, + Snapshot: *remainingSnap, SnapIndex: i, } fetcherInfoCh <- fetcherInfo @@ -415,36 +431,53 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest close(fetcherInfoCh) err := <-errCh - r.cleanup(snapLocationsCh, stopCh, &wg) - if err == nil { - r.logger.Infof("Restoration complete.") - } else { + + if cleanupErr := r.cleanup(snapLocationsCh, stopCh, &wg); cleanupErr != nil { + r.logger.Errorf("Cleanup of temporary snapshots failed: %v", cleanupErr) + } + + if err != nil { r.logger.Errorf("Restoration failed.") + return err } - return err + r.logger.Infof("Restoration complete.") + + return nil } // cleanup stops all running goroutines and removes the persisted snapshot files from disk. -func (r *Restorer) cleanup(snapLocationsCh chan string, stopCh chan bool, wg *sync.WaitGroup) { - close(stopCh) +func (r *Restorer) cleanup(snapLocationsCh chan string, stopCh chan bool, wg *sync.WaitGroup) error { + var errs []error + close(stopCh) wg.Wait() - close(snapLocationsCh) for filePath := range snapLocationsCh { - if _, err := os.Stat(filePath); err == nil && !os.IsNotExist(err) { - if err = os.Remove(filePath); err != nil { - r.logger.Warnf("Unable to remove file, file: %s, err: %v", filePath, err) + if _, err := os.Stat(filePath); err != nil { + if !os.IsNotExist(err) { + errs = append(errs, fmt.Errorf("unable to stat file %s: %v", filePath, err)) } + continue } + + if err := os.Remove(filePath); err != nil { + errs = append(errs, fmt.Errorf("unable to remove file %s: %v", filePath, err)) + } + } + + if len(errs) != 0 { + r.logger.Error("Cleanup failed") + return errorArrayToError(errs) } - r.logger.Infof("Cleanup complete") + + r.logger.Info("Cleanup complete") + return nil } // fetchSnaps fetches delta snapshots as events and persists them onto disk. -func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.FetcherInfo, applierInfoCh chan<- brtypes.ApplierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup) { +func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.FetcherInfo, applierInfoCh chan<- brtypes.ApplierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup, tempDir string) { defer wg.Done() wg.Add(1) @@ -464,7 +497,8 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.Fet return } - eventsFilePath, err := persistDeltaSnapshot(eventsData) + tempFilePath := filepath.Join(tempDir, fetcherInfo.Snapshot.SnapName) + eventsFilePath, err := persistDeltaSnapshot(eventsData, tempFilePath) if err != nil { errCh <- fmt.Errorf("failed to persist events data for delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} @@ -514,29 +548,33 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S break } - r.logger.Infof("Applying delta snapshot %s", path.Join(remainingSnaps[currSnapIndex].SnapDir, remainingSnaps[currSnapIndex].SnapName)) - filePath := pathList[currSnapIndex] snapName := remainingSnaps[currSnapIndex].SnapName + r.logger.Infof("Reading snapshot %s from temporary file %s", snapName, filePath) eventsData, err := os.ReadFile(filePath) if err != nil { errCh <- fmt.Errorf("failed to read events data from file for delta snapshot %s : %v", snapName, err) return } - if err = os.Remove(filePath); err != nil { - r.logger.Warnf("Unable to remove file: %s; err: %v", filePath, err) - } + events := []brtypes.Event{} if err = json.Unmarshal(eventsData, &events); err != nil { errCh <- fmt.Errorf("failed to read events from events data for delta snapshot %s : %v", snapName, err) return } + r.logger.Infof("Applying delta snapshot %s [%d/%d]", path.Join(remainingSnaps[currSnapIndex].SnapDir, remainingSnaps[currSnapIndex].SnapName), currSnapIndex+2, len(remainingSnaps)+1) if err := applyEventsAndVerify(clientKV, events, remainingSnaps[currSnapIndex]); err != nil { errCh <- err return } + + r.logger.Infof("Removing temporary snapshot file %s for snapshot %s", filePath, snapName) + if err = os.Remove(filePath); err != nil { + r.logger.Warnf("Unable to remove file: %s; err: %v", filePath, err) + } + nextSnapIndexToApply++ if nextSnapIndexToApply == len(remainingSnaps) { errCh <- nil // restore finished @@ -569,8 +607,8 @@ func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap brtype } // Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot. - // This is because of issue refereed below. So, as per workaround used in our logic of taking delta snapshot, - // latest revision from full snapshot may overlap with first few revision on first delta snapshot + // This is because of issue referred below. So, as per workaround used in our logic of taking delta snapshot, + // the latest revision from full snapshot may overlap with first few revision on first delta snapshot // Hence, we have to additionally take care of that. // Refer: https://github.com/coreos/etcd/issues/9037 ctx := context.TODO() @@ -662,20 +700,20 @@ func (r *Restorer) getEventsDataFromDeltaSnapshot(snap brtypes.Snapshot) ([]byte } // persistDeltaSnapshot writes delta snapshot events to disk and returns the file path for the same. -func persistDeltaSnapshot(data []byte) (string, error) { - tmpFile, err := os.CreateTemp(tmpDir, tmpEventsDataFilePrefix) +func persistDeltaSnapshot(data []byte, tempFilePath string) (string, error) { + tempFile, err := os.Create(tempFilePath) if err != nil { - err = fmt.Errorf("failed to create temp file") + err = fmt.Errorf("failed to create temp file %s", tempFilePath) return "", err } - defer tmpFile.Close() + defer tempFile.Close() - if _, err = tmpFile.Write(data); err != nil { - err = fmt.Errorf("failed to write events data into temp file") + if _, err = tempFile.Write(data); err != nil { + err = fmt.Errorf("failed to write events data into temp file %s", tempFilePath) return "", err } - return tmpFile.Name(), nil + return tempFile.Name(), nil } // applyEventsToEtcd performs operations in events sequentially. @@ -703,7 +741,7 @@ func applyEventsToEtcd(clientKV client.KVCloser, events []brtypes.Event) error { case mvccpb.DELETE: ops = append(ops, clientv3.OpDelete(string(ev.Kv.Key))) default: - return fmt.Errorf("Unexpected event type") + return fmt.Errorf("unexpected event type") } } _, err := clientKV.Txn(ctx).Then(ops...).Commit() @@ -722,3 +760,14 @@ func verifySnapshotRevision(clientKV client.KVCloser, snap *brtypes.Snapshot) er } return nil } + +// errorArrayToError takes an array of errors and returns a single concatenated error +func errorArrayToError(errs []error) error { + var errString string + + for _, e := range errs { + errString = fmt.Sprintf("%s\n%s", errString, e.Error()) + } + + return fmt.Errorf("%s", strings.TrimSpace(errString)) +} diff --git a/pkg/snapshot/restorer/restorer_suite_test.go b/pkg/snapshot/restorer/restorer_suite_test.go index a866f6acb..87df3d6ce 100644 --- a/pkg/snapshot/restorer/restorer_suite_test.go +++ b/pkg/snapshot/restorer/restorer_suite_test.go @@ -18,6 +18,7 @@ import ( "context" "os" "path" + "path/filepath" "sync" "testing" "time" @@ -34,18 +35,19 @@ import ( const ( outputDir = "../../../test/output" - etcdDir = outputDir + "/default.etcd" - snapstoreDir = outputDir + "/snapshotter.bkp" embeddedEtcdPortNo = "9089" ) var ( - testCtx = context.Background() - logger = logrus.New().WithField("suite", "restorer") - etcd *embed.Etcd - err error - keyTo int - endpoints []string + etcdDir = filepath.Join(outputDir, "default.etcd") + tempDir = filepath.Join(outputDir, "default.restore.tmp") + snapstoreDir = filepath.Join(outputDir, "snapshotter.bkp") + testCtx = context.Background() + logger = logrus.New().WithField("suite", "restorer") + etcd *embed.Etcd + err error + keyTo int + endpoints []string ) func TestRestorer(t *testing.T) { diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 1cc8123cf..8d0bd4a95 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -20,6 +20,7 @@ import ( "io" "os" "path" + "path/filepath" "strings" "sync" "time" @@ -53,7 +54,7 @@ const ( var _ = Describe("Running Restorer", func() { var ( store brtypes.SnapStore - rstr *Restorer + restorer *Restorer restorePeerURLs []string clusterUrlsMap types.URLsMap peerUrls types.URLs @@ -98,10 +99,11 @@ var _ = Describe("Running Restorer", func() { baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts = brtypes.RestoreOptions{ Config: &brtypes.RestorationConfig{ - RestoreDataDir: etcdDir, + DataDir: etcdDir, + TempDir: tempDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, Name: restoreName, @@ -137,16 +139,16 @@ var _ = Describe("Running Restorer", func() { restoreOpts.Config.InitialAdvertisePeerURLs = []string{"http://localhost:2390"} restoreOpts.ClusterURLs, err = types.NewURLsMap(restoreOpts.Config.InitialCluster) - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).Should(HaveOccurred()) }) }) Context("with invalid restore directory", func() { It("should fail to restore", func() { - restoreOpts.Config.RestoreDataDir = "" + restoreOpts.Config.DataDir = "" - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).Should(HaveOccurred()) }) }) @@ -156,7 +158,7 @@ var _ = Describe("Running Restorer", func() { restoreOpts.BaseSnapshot.SnapDir = "test" restoreOpts.BaseSnapshot.SnapName = "test" - err := rstr.RestoreAndStopEtcd(restoreOpts, nil) + err := restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).Should(HaveOccurred()) }) }) @@ -182,10 +184,10 @@ var _ = Describe("Running Restorer", func() { Context("with maximum of one fetcher allowed", func() { It("should restore etcd data directory", func() { restoreOpts.Config.MaxFetchers = 1 - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -194,10 +196,10 @@ var _ = Describe("Running Restorer", func() { It("should restore etcd data directory", func() { restoreOpts.Config.MaxFetchers = 4 - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -206,10 +208,10 @@ var _ = Describe("Running Restorer", func() { It("should restore etcd data directory", func() { restoreOpts.Config.MaxFetchers = 100 - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -260,7 +262,8 @@ var _ = Describe("Running Restorer", func() { Expect(err).ShouldNot(HaveOccurred()) restorationConfig = &brtypes.RestorationConfig{ - RestoreDataDir: etcdDir, + DataDir: etcdDir, + TempDir: tempDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, Name: restoreName, @@ -309,7 +312,7 @@ var _ = Describe("Running Restorer", func() { logger.Infof("No of delta snapshots: %d", deltaSnapList.Len()) logger.Infof("Base snapshot is %v", baseSnapshot) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, BaseSnapshot: baseSnapshot, @@ -323,10 +326,10 @@ var _ = Describe("Running Restorer", func() { restoreOpts.BaseSnapshot.SnapName = "" } - err := rstr.RestoreAndStopEtcd(restoreOpts, nil) + err := restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -354,7 +357,7 @@ var _ = Describe("Running Restorer", func() { Expect(err).ShouldNot(HaveOccurred()) Expect(deltaSnapList.Len()).Should(BeZero()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -364,7 +367,7 @@ var _ = Describe("Running Restorer", func() { PeerURLs: peerUrls, } - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) @@ -398,7 +401,7 @@ var _ = Describe("Running Restorer", func() { logger.Infof("Removed snapshot to cause corruption %s", snapshotToRemove) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -408,11 +411,11 @@ var _ = Describe("Running Restorer", func() { PeerURLs: peerUrls, } - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).Should(HaveOccurred()) // the below consistency fails with index out of range error hence commented, // but the etcd directory is filled partially as part of the restore which should be relooked. - // err = checkDataConsistency(restoreOptions.Config.RestoreDataDir, logger) + // err = checkDataConsistency(restoreOptions.Config.DataDir, logger) // Expect(err).Should(HaveOccurred()) }) @@ -435,7 +438,7 @@ var _ = Describe("Running Restorer", func() { baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -446,7 +449,7 @@ var _ = Describe("Running Restorer", func() { } logger.Infoln("starting restore, restore directory exists already") - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) logger.Infof("Failed to restore because :: %s", err) Expect(err).Should(HaveOccurred()) @@ -483,7 +486,7 @@ var _ = Describe("Running Restorer", func() { baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -494,9 +497,9 @@ var _ = Describe("Running Restorer", func() { } logger.Infoln("starting restore while snapshotter is running") - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) // Although the test has passed but the logic currently doesn't stop snapshotter explicitly but assumes that restore @@ -572,7 +575,7 @@ var _ = Describe("Running Restorer", func() { baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -582,9 +585,9 @@ var _ = Describe("Running Restorer", func() { PeerURLs: peerUrls, } - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -657,7 +660,7 @@ var _ = Describe("Running Restorer", func() { baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -667,9 +670,9 @@ var _ = Describe("Running Restorer", func() { PeerURLs: peerUrls, } - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -678,7 +681,7 @@ var _ = Describe("Running Restorer", func() { var _ = Describe("Running Restorer when both v1 and v2 directory structures are present", func() { var ( - rstr *Restorer + restorer *Restorer restorePeerURLs []string clusterUrlsMap types.URLsMap peerUrls types.URLs @@ -687,9 +690,10 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are store brtypes.SnapStore deltaSnapshotPeriod time.Duration ep []string - emDir string - compactDir string - cmpctStoreDir string + etcdDataDir string + restoreTempDir string + testDir string + storeDir string restorationConfig *brtypes.RestorationConfig ) const ( @@ -712,10 +716,11 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are ) BeforeEach(func() { deltaSnapshotPeriod = time.Second - compactDir = outputDir + "/compaction-test" - emDir = compactDir + "/default.etcd" - cmpctStoreDir = compactDir + "/snapshotter.bkp" - etcd, err = utils.StartEmbeddedEtcd(testCtx, emDir, logger, embeddedEtcdPortNo) + testDir = filepath.Join(outputDir, "restore-test-v1-v2") + etcdDataDir = filepath.Join(testDir, "default.etcd") + restoreTempDir = filepath.Join(testDir, "default.restore.tmp") + storeDir = filepath.Join(testDir, "snapshotter.bkp") + etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDataDir, logger, embeddedEtcdPortNo) Expect(err).ShouldNot(HaveOccurred()) ep = []string{etcd.Clients[0].Addr().String()} @@ -725,11 +730,12 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are peerUrls, err = types.NewURLs(restorePeerURLs) Expect(err).ShouldNot(HaveOccurred()) - store, err = snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: cmpctStoreDir, Provider: "Local", Prefix: "v2"}) + store, err = snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: storeDir, Provider: "Local", Prefix: "v2"}) Expect(err).ShouldNot(HaveOccurred()) restorationConfig = &brtypes.RestorationConfig{ - RestoreDataDir: emDir, + DataDir: etcdDataDir, + TempDir: restoreTempDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, Name: restoreName, @@ -751,7 +757,7 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are etcd.Server.Stop() etcd.Close() - err = os.RemoveAll(compactDir) + err = os.RemoveAll(testDir) Expect(err).ShouldNot(HaveOccurred()) }) @@ -760,13 +766,13 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are Context("With snapshots in v1 dir only", func() { It("should restore from v1 dir", func() { //Take snapshots for v1 dir - err = takeValidSnaps(logger, cmpctStoreDir, resp, deltaSnapshotPeriod, ep, v1, allSnapsInV1) + err = takeValidSnaps(logger, storeDir, resp, deltaSnapshotPeriod, ep, v1, allSnapsInV1) Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -779,12 +785,12 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are //Restore // remove the member dir - err = os.RemoveAll(path.Join(emDir, "member")) + err = os.RemoveAll(path.Join(etcdDataDir, "member")) Expect(err).ShouldNot(HaveOccurred()) - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, resp.KeyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, resp.KeyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -792,13 +798,13 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are Context("With first full snapshot in v1 dir and some incr snapshots are in v2 dir", func() { It("should restore from v1 and v2 dir", func() { //Take snapshots for v1 and v2 dir - err = takeValidSnaps(logger, cmpctStoreDir, resp, deltaSnapshotPeriod, ep, mixed, fullSnapInV1) + err = takeValidSnaps(logger, storeDir, resp, deltaSnapshotPeriod, ep, mixed, fullSnapInV1) Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -811,12 +817,12 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are //Restore // remove the member dir - err = os.RemoveAll(path.Join(emDir, "member")) + err = os.RemoveAll(path.Join(etcdDataDir, "member")) Expect(err).ShouldNot(HaveOccurred()) - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, resp.KeyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, resp.KeyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -824,13 +830,13 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are Context("With first full snapshots in v2 dir and some incr snapshots are in v1 dir", func() { It("should restore from v1 and v2 dir", func() { //Take snapshots for v1 and v2 dir - err = takeValidSnaps(logger, cmpctStoreDir, resp, deltaSnapshotPeriod, ep, mixed, fullSnapInV2) + err = takeValidSnaps(logger, storeDir, resp, deltaSnapshotPeriod, ep, mixed, fullSnapInV2) Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -843,12 +849,12 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are //Restore // remove the member dir - err = os.RemoveAll(path.Join(emDir, "member")) + err = os.RemoveAll(path.Join(etcdDataDir, "member")) Expect(err).ShouldNot(HaveOccurred()) - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, resp.KeyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, resp.KeyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -858,13 +864,13 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are Context("With snapshots in v2 dir only", func() { It("should restore from v2 dir snapshots", func() { // take snapshots for the v2 dir - err = takeValidSnaps(logger, cmpctStoreDir, resp, deltaSnapshotPeriod, ep, v2, allSnapsInV2) + err = takeValidSnaps(logger, storeDir, resp, deltaSnapshotPeriod, ep, v2, allSnapsInV2) Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -877,12 +883,12 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are //Restore // remove the member dir - err = os.RemoveAll(path.Join(emDir, "member")) + err = os.RemoveAll(path.Join(etcdDataDir, "member")) Expect(err).ShouldNot(HaveOccurred()) - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).ShouldNot(HaveOccurred()) - err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, resp.KeyTo, logger) + err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, resp.KeyTo, logger) Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -891,13 +897,13 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are Context("with invalid snapshots in v1 directory", func() { It("should not restorer", func() { //Take invalid snapshots for v1 dir - err = takeInvalidV1Snaps(cmpctStoreDir) + err = takeInvalidV1Snaps(storeDir) Expect(err).ShouldNot(HaveOccurred()) baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) Expect(err).ShouldNot(HaveOccurred()) - rstr = NewRestorer(store, logger) + restorer = NewRestorer(store, logger) restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, @@ -910,10 +916,10 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are //Restore // remove the member dir - err = os.RemoveAll(path.Join(emDir, "member")) + err = os.RemoveAll(path.Join(etcdDataDir, "member")) Expect(err).ShouldNot(HaveOccurred()) - err = rstr.RestoreAndStopEtcd(restoreOpts, nil) + err = restorer.RestoreAndStopEtcd(restoreOpts, nil) Expect(err).Should(HaveOccurred()) }) }) diff --git a/pkg/snapshot/restorer/types_test.go b/pkg/snapshot/restorer/types_test.go index 38b671abc..8654b3b22 100644 --- a/pkg/snapshot/restorer/types_test.go +++ b/pkg/snapshot/restorer/types_test.go @@ -32,7 +32,8 @@ var _ = Describe("restorer types", func() { return &brtypes.RestorationConfig{ InitialCluster: s, InitialClusterToken: s, - RestoreDataDir: s, + DataDir: s, + TempDir: s, InitialAdvertisePeerURLs: []string{s, s}, Name: s, SkipHashCheck: b, diff --git a/pkg/types/restorer.go b/pkg/types/restorer.go index f3bf39042..bfb0e75ff 100644 --- a/pkg/types/restorer.go +++ b/pkg/types/restorer.go @@ -61,7 +61,8 @@ type RestoreOptions struct { type RestorationConfig struct { InitialCluster string `json:"initialCluster"` InitialClusterToken string `json:"initialClusterToken,omitempty"` - RestoreDataDir string `json:"restoreDataDir,omitempty"` + DataDir string `json:"dataDir,omitempty"` + TempDir string `json:"tempDir,omitempty"` InitialAdvertisePeerURLs []string `json:"initialAdvertisePeerURLs"` Name string `json:"name"` SkipHashCheck bool `json:"skipHashCheck,omitempty"` @@ -79,7 +80,8 @@ func NewRestorationConfig() *RestorationConfig { return &RestorationConfig{ InitialCluster: initialClusterFromName(defaultName), InitialClusterToken: defaultInitialClusterToken, - RestoreDataDir: fmt.Sprintf("%s.etcd", defaultName), + DataDir: fmt.Sprintf("%s.etcd", defaultName), + TempDir: fmt.Sprintf("%s.restore.tmp", defaultName), InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, Name: defaultName, SkipHashCheck: false, @@ -97,7 +99,8 @@ func NewRestorationConfig() *RestorationConfig { func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") - fs.StringVarP(&c.RestoreDataDir, "data-dir", "d", c.RestoreDataDir, "path to the data directory") + fs.StringVarP(&c.DataDir, "data-dir", "d", c.DataDir, "path to the data directory") + fs.StringVar(&c.TempDir, "restore-temp-dir", c.TempDir, "path to the temporary directory to store snapshot files during restoration") fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") @@ -125,12 +128,13 @@ func (c *RestorationConfig) Validate() error { return fmt.Errorf("max fetchers should be greater than zero") } if c.EmbeddedEtcdQuotaBytes <= 0 { - return fmt.Errorf("Etcd Quota size for etcd must be greater than 0") + return fmt.Errorf("etcd quota size for etcd must be greater than 0") } if c.AutoCompactionMode != "periodic" && c.AutoCompactionMode != "revision" { return fmt.Errorf("UnSupported auto-compaction-mode") } - c.RestoreDataDir = path.Clean(c.RestoreDataDir) + c.DataDir = path.Clean(c.DataDir) + c.TempDir = path.Clean(c.TempDir) return nil } From 853fc87e3c8d07fd01980e369bdecb1277a14685 Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Mon, 3 Apr 2023 23:12:03 +0530 Subject: [PATCH 2/7] Optimize decompression during restoration of delta snapshots --- pkg/snapshot/restorer/restorer.go | 132 ++++++++++++++++++++++++------ pkg/types/restorer.go | 4 +- 2 files changed, 109 insertions(+), 27 deletions(-) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index a15da890a..b3da2bcc8 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -391,7 +391,7 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest firstDeltaSnap := snapList[0] - if err := r.applyFirstDeltaSnapshot(clientKV, *firstDeltaSnap); err != nil { + if err := r.applyFirstDeltaSnapshot(clientKV, firstDeltaSnap); err != nil { return err } if err := verifySnapshotRevision(clientKV, snapList[0]); err != nil { @@ -490,26 +490,23 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.Fet default: r.logger.Infof("Fetcher #%d fetching delta snapshot %s", fetcherIndex+1, path.Join(fetcherInfo.Snapshot.SnapDir, fetcherInfo.Snapshot.SnapName)) - eventsData, err := r.getEventsDataFromDeltaSnapshot(fetcherInfo.Snapshot) + rc, err := r.store.Fetch(fetcherInfo.Snapshot) if err != nil { - errCh <- fmt.Errorf("failed to read events data from delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) + errCh <- fmt.Errorf("failed to fetch delta snapshot %s from store : %v", fetcherInfo.Snapshot.SnapName, err) applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} // cannot use close(ch) as concurrent fetchSnaps routines might try to send on channel, causing a panic - return } - tempFilePath := filepath.Join(tempDir, fetcherInfo.Snapshot.SnapName) - eventsFilePath, err := persistDeltaSnapshot(eventsData, tempFilePath) - if err != nil { - errCh <- fmt.Errorf("failed to persist events data for delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) + snapTempFilePath := filepath.Join(tempDir, fetcherInfo.Snapshot.SnapName) + if err = persistRawDeltaSnapshot(rc, snapTempFilePath); err != nil { + errCh <- fmt.Errorf("failed to persist delta snapshot %s to temp file path %s : %v", fetcherInfo.Snapshot.SnapName, snapTempFilePath, err) applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} - return } - snapLocationsCh <- eventsFilePath // used for cleanup later + snapLocationsCh <- snapTempFilePath // used for cleanup later applierInfo := brtypes.ApplierInfo{ - EventsFilePath: eventsFilePath, - SnapIndex: fetcherInfo.SnapIndex, + SnapFilePath: snapTempFilePath, + SnapIndex: fetcherInfo.SnapIndex, } applierInfoCh <- applierInfo } @@ -536,7 +533,7 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S } fetchedSnapIndex := applierInfo.SnapIndex - pathList[fetchedSnapIndex] = applierInfo.EventsFilePath + pathList[fetchedSnapIndex] = applierInfo.SnapFilePath if fetchedSnapIndex < nextSnapIndexToApply { errCh <- fmt.Errorf("snap index mismatch for delta snapshot %d; expected snap index to be atleast %d", fetchedSnapIndex, nextSnapIndexToApply) @@ -551,10 +548,10 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S filePath := pathList[currSnapIndex] snapName := remainingSnaps[currSnapIndex].SnapName - r.logger.Infof("Reading snapshot %s from temporary file %s", snapName, filePath) - eventsData, err := os.ReadFile(filePath) + r.logger.Infof("Reading snapshot contents %s from raw snapshot file %s", snapName, filePath) + eventsData, err := r.readSnapshotContentsFromFile(filePath, remainingSnaps[currSnapIndex]) if err != nil { - errCh <- fmt.Errorf("failed to read events data from file for delta snapshot %s : %v", snapName, err) + errCh <- fmt.Errorf("failed to read events data from delta snapshot file %s : %v", filePath, err) return } @@ -599,10 +596,21 @@ func applyEventsAndVerify(clientKV client.KVCloser, events []brtypes.Event, snap } // applyFirstDeltaSnapshot applies the events from first delta snapshot to etcd. -func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap brtypes.Snapshot) error { +func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap *brtypes.Snapshot) error { r.logger.Infof("Applying first delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName)) - events, err := r.getEventsFromDeltaSnapshot(snap) + + rc, err := r.store.Fetch(*snap) if err != nil { + return fmt.Errorf("failed to fetch delta snapshot %s from store : %v", snap.SnapName, err) + } + + eventsData, err := r.readSnapshotContentsFromReadCloser(rc, snap) + if err != nil { + return fmt.Errorf("failed to read events data from delta snapshot %s : %v", snap.SnapName, err) + } + + events := []brtypes.Event{} + if err = json.Unmarshal(eventsData, &events); err != nil { return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err) } @@ -626,6 +634,8 @@ func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap brtype } } + r.logger.Infof("Applying first delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName)) + return applyEventsToEtcd(clientKV, events[newRevisionIndex:]) } @@ -699,21 +709,20 @@ func (r *Restorer) getEventsDataFromDeltaSnapshot(snap brtypes.Snapshot) ([]byte return data, nil } -// persistDeltaSnapshot writes delta snapshot events to disk and returns the file path for the same. -func persistDeltaSnapshot(data []byte, tempFilePath string) (string, error) { +func persistRawDeltaSnapshot(rc io.ReadCloser, tempFilePath string) error { tempFile, err := os.Create(tempFilePath) if err != nil { err = fmt.Errorf("failed to create temp file %s", tempFilePath) - return "", err + return err } defer tempFile.Close() - if _, err = tempFile.Write(data); err != nil { - err = fmt.Errorf("failed to write events data into temp file %s", tempFilePath) - return "", err + _, err = tempFile.ReadFrom(rc) + if err != nil { + return err } - return tempFile.Name(), nil + return rc.Close() } // applyEventsToEtcd performs operations in events sequentially. @@ -761,6 +770,79 @@ func verifySnapshotRevision(clientKV client.KVCloser, snap *brtypes.Snapshot) er return nil } +// getDecompressedSnapshotReadCloser passes the given ReadCloser through the +// snapshot decompressor if the snapshot is compressed using a compression policy. +// Also returns whether the snapshot was initially compressed or not, as well as +// the compression policy used for compressing the snapshot. +func getDecompressedSnapshotReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) (io.ReadCloser, bool, string, error) { + isCompressed, compressionPolicy, err := compressor.IsSnapshotCompressed(snap.CompressionSuffix) + if err != nil { + return nil, false, compressionPolicy, err + } + + if isCompressed { + // decompress the snapshot + rc, err = compressor.DecompressSnapshot(rc, compressionPolicy) + if err != nil { + return nil, true, compressionPolicy, fmt.Errorf("unable to decompress the snapshot: %v", err) + } + } + + return rc, true, compressionPolicy, nil +} + +func (r *Restorer) readSnapshotContentsFromReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) ([]byte, error) { + startTime := time.Now() + + rc, wasCompressed, compressionPolicy, err := getDecompressedSnapshotReadCloser(rc, snap) + if err != nil { + return nil, fmt.Errorf("failed to decompress delta snapshot %s : %v", snap.SnapName, err) + } + + buf := new(bytes.Buffer) + bufSize, err := buf.ReadFrom(rc) + if err != nil { + return nil, fmt.Errorf("failed to parse contents from delta snapshot %s : %v", snap.SnapName, err) + } + + totalTime := time.Now().Sub(startTime).Seconds() + if wasCompressed { + r.logger.Infof("successfully fetched data of delta snapshot in %v seconds [CompressionPolicy:%v]", totalTime, compressionPolicy) + } else { + r.logger.Infof("successfully fetched data of delta snapshot in %v seconds", totalTime) + } + + if bufSize <= sha256.Size { + return nil, fmt.Errorf("delta snapshot is missing hash") + } + + sha := buf.Bytes() + data := sha[:bufSize-sha256.Size] + snapHash := sha[bufSize-sha256.Size:] + + // check for match + h := sha256.New() + if _, err := h.Write(data); err != nil { + return nil, fmt.Errorf("unable to check integrity of snapshot %s: %v", snap.SnapName, err) + } + + computedSha := h.Sum(nil) + if !reflect.DeepEqual(snapHash, computedSha) { + return nil, fmt.Errorf("expected sha256 %v, got %v", snapHash, computedSha) + } + + return data, nil +} + +func (r *Restorer) readSnapshotContentsFromFile(filePath string, snap *brtypes.Snapshot) ([]byte, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open file %s for delta snapshot %s : %v", filePath, snap.SnapName, err) + } + + return r.readSnapshotContentsFromReadCloser(file, snap) +} + // errorArrayToError takes an array of errors and returns a single concatenated error func errorArrayToError(errs []error) error { var errString string diff --git a/pkg/types/restorer.go b/pkg/types/restorer.go index bfb0e75ff..7910a49d0 100644 --- a/pkg/types/restorer.go +++ b/pkg/types/restorer.go @@ -191,8 +191,8 @@ type FetcherInfo struct { // ApplierInfo stores the info about applier type ApplierInfo struct { - EventsFilePath string - SnapIndex int + SnapFilePath string + SnapIndex int } // DeepCopyInto copies the structure deeply from in to out. From c8a278af9cd7a50cc683e51461f2cf7e5ae52faa Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Tue, 4 Apr 2023 17:52:57 +0530 Subject: [PATCH 3/7] Address review comments from @unmarshall --- pkg/snapshot/restorer/restorer.go | 83 +++++++++++++++++--------- pkg/snapshot/restorer/restorer_test.go | 39 ++++++++++++ pkg/types/restorer.go | 2 +- 3 files changed, 96 insertions(+), 28 deletions(-) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index b3da2bcc8..4c61a795c 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -48,7 +48,6 @@ import ( "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" - "go.etcd.io/etcd/pkg/fileutil" "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -57,6 +56,10 @@ import ( "go.uber.org/zap" ) +const ( + etcdDialTimeout = time.Second * 30 +) + // Restorer is a struct for etcd data directory restorer type Restorer struct { logger *logrus.Entry @@ -89,16 +92,16 @@ func (r *Restorer) RestoreAndStopEtcd(ro brtypes.RestoreOptions, m member.Contro // Restore restores the etcd data directory as per specified restore options but returns the ETCD server that it statrted. func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed.Etcd, error) { if err := r.restoreFromBaseSnapshot(ro); err != nil { - return nil, fmt.Errorf("failed to restore from the base snapshot :%v", err) + return nil, fmt.Errorf("failed to restore from the base snapshot: %v", err) } if len(ro.DeltaSnapList) == 0 { r.logger.Infof("No delta snapshots present over base snapshot.") return nil, nil } - r.logger.Infof("Attempting to apply %d delta snapshots for restoration.", len(ro.DeltaSnapList)) - r.logger.Infof("Creating temporary directory for persisting delta snapshots locally.") + r.logger.Infof("Attempting to apply %d delta snapshots for restoration.", len(ro.DeltaSnapList)) + r.logger.Infof("Creating temporary directory %s for persisting delta snapshots locally.", ro.Config.TempDir) err := os.MkdirAll(ro.Config.TempDir, 0700) if err != nil { @@ -152,8 +155,7 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. func (r *Restorer) restoreFromBaseSnapshot(ro brtypes.RestoreOptions) error { var err error if path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName) == "" { - r.logger.Warnf("Base snapshot path not provided. Will do nothing.") - return nil + return fmt.Errorf("base snapshot path not provided") } r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName)) cfg := etcdserver.ServerConfig{ @@ -205,7 +207,7 @@ func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, sk } defer rc.Close() - if err := fileutil.CreateDirAll(snapDir); err != nil { + if err := os.MkdirAll(snapDir, 0700); err != nil { return err } @@ -217,7 +219,11 @@ func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, sk if _, err := io.Copy(db, rc); err != nil { return err } - db.Sync() + + if err := db.Sync(); err != nil { + return err + } + totalTime := time.Now().Sub(startTime).Seconds() if isCompressed { @@ -269,7 +275,10 @@ func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, sk } // db hash is OK - db.Close() + if err := db.Close(); err != nil { + return err + } + // update consistentIndex so applies go through on etcdserver despite // having a new raft instance be := backend.NewDefaultBackend(dbPath) @@ -286,19 +295,32 @@ func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, sk } // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) + if err := btx.UnsafeForEach([]byte("members"), del); err != nil { + return err + } + // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) + if err := btx.UnsafeForEach([]byte("members_removed"), del); err != nil { + return err + } + // trigger write-out of new consistent index txn.End() s.Commit() - s.Close() - be.Close() + + if err := s.Close(); err != nil { + return err + } + + if err := be.Close(); err != nil { + return err + } + return nil } func makeWALAndSnap(logger *zap.Logger, walDir, snapDir string, cl *membership.RaftCluster, restoreName string) error { - if err := fileutil.CreateDirAll(walDir); err != nil { + if err := os.MkdirAll(walDir, 0700); err != nil { return err } @@ -469,7 +491,7 @@ func (r *Restorer) cleanup(snapLocationsCh chan string, stopCh chan bool, wg *sy if len(errs) != 0 { r.logger.Error("Cleanup failed") - return errorArrayToError(errs) + return ErrorArrayToError(errs) } r.logger.Info("Cleanup complete") @@ -555,9 +577,9 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S return } - events := []brtypes.Event{} + var events []brtypes.Event if err = json.Unmarshal(eventsData, &events); err != nil { - errCh <- fmt.Errorf("failed to read events from events data for delta snapshot %s : %v", snapName, err) + errCh <- fmt.Errorf("failed to unmarshal events from events data for delta snapshot %s : %v", snapName, err) return } @@ -567,7 +589,7 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S return } - r.logger.Infof("Removing temporary snapshot file %s for snapshot %s", filePath, snapName) + r.logger.Infof("Removing temporary delta snapshot events file %s for snapshot %s", filePath, snapName) if err = os.Remove(filePath); err != nil { r.logger.Warnf("Unable to remove file: %s; err: %v", filePath, err) } @@ -609,9 +631,9 @@ func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap *brtyp return fmt.Errorf("failed to read events data from delta snapshot %s : %v", snap.SnapName, err) } - events := []brtypes.Event{} + var events []brtypes.Event if err = json.Unmarshal(eventsData, &events); err != nil { - return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err) + return fmt.Errorf("failed to unmarshal events data from delta snapshot %s : %v", snap.SnapName, err) } // Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot. @@ -619,7 +641,8 @@ func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap *brtyp // the latest revision from full snapshot may overlap with first few revision on first delta snapshot // Hence, we have to additionally take care of that. // Refer: https://github.com/coreos/etcd/issues/9037 - ctx := context.TODO() + ctx, cancel := context.WithTimeout(context.TODO(), etcdDialTimeout) + defer cancel() resp, err := clientKV.Get(ctx, "", clientv3.WithLastRev()...) if err != nil { return fmt.Errorf("failed to get etcd latest revision: %v", err) @@ -712,10 +735,12 @@ func (r *Restorer) getEventsDataFromDeltaSnapshot(snap brtypes.Snapshot) ([]byte func persistRawDeltaSnapshot(rc io.ReadCloser, tempFilePath string) error { tempFile, err := os.Create(tempFilePath) if err != nil { - err = fmt.Errorf("failed to create temp file %s", tempFilePath) + err = fmt.Errorf("failed to create temp file %s to store raw delta snapshot", tempFilePath) return err } - defer tempFile.Close() + defer func() { + _ = tempFile.Close() + }() _, err = tempFile.ReadFrom(rc) if err != nil { @@ -807,9 +832,9 @@ func (r *Restorer) readSnapshotContentsFromReadCloser(rc io.ReadCloser, snap *br totalTime := time.Now().Sub(startTime).Seconds() if wasCompressed { - r.logger.Infof("successfully fetched data of delta snapshot in %v seconds [CompressionPolicy:%v]", totalTime, compressionPolicy) + r.logger.Infof("successfully decompressed data of delta snapshot in %v seconds [CompressionPolicy:%v]", totalTime, compressionPolicy) } else { - r.logger.Infof("successfully fetched data of delta snapshot in %v seconds", totalTime) + r.logger.Infof("successfully decompressed data of delta snapshot in %v seconds", totalTime) } if bufSize <= sha256.Size { @@ -843,8 +868,12 @@ func (r *Restorer) readSnapshotContentsFromFile(filePath string, snap *brtypes.S return r.readSnapshotContentsFromReadCloser(file, snap) } -// errorArrayToError takes an array of errors and returns a single concatenated error -func errorArrayToError(errs []error) error { +// ErrorArrayToError takes an array of errors and returns a single concatenated error +func ErrorArrayToError(errs []error) error { + if len(errs) == 0 { + return nil + } + var errString string for _, e := range errs { diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 8d0bd4a95..8baa4e820 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -926,6 +926,45 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are }) }) +var _ = Describe("Unit testing individual functions for restorer package", func() { + Describe("testing ErrorArrayToError", func() { + var ( + errs []error + ) + Context("when error array has no elements", func() { + It("should return nil", func() { + errs = []error{} + Expect(ErrorArrayToError(errs)).Should(BeNil()) + }) + }) + Context("when error array has one element", func() { + It("should return nil", func() { + errs = []error{ + fmt.Errorf("error0"), + } + expectedErr := fmt.Errorf("error0") + + err := ErrorArrayToError(errs) + Expect(err).Should(Equal(expectedErr)) + }) + }) + Context("when error array has more than one element", func() { + It("should return nil", func() { + errs = []error{ + fmt.Errorf("error0"), + fmt.Errorf("error1"), + fmt.Errorf("error2"), + } + expectedErr := fmt.Errorf("error0\nerror1\nerror2") + + err := ErrorArrayToError(errs) + Expect(err).Should(Equal(expectedErr)) + }) + }) + }) + +}) + // corruptEtcdDir corrupts the etcd directory by deleting it func corruptEtcdDir() error { if _, err := os.Stat(etcdDir); os.IsNotExist(err) { diff --git a/pkg/types/restorer.go b/pkg/types/restorer.go index 7910a49d0..b7c1cedcf 100644 --- a/pkg/types/restorer.go +++ b/pkg/types/restorer.go @@ -100,7 +100,7 @@ func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") fs.StringVarP(&c.DataDir, "data-dir", "d", c.DataDir, "path to the data directory") - fs.StringVar(&c.TempDir, "restore-temp-dir", c.TempDir, "path to the temporary directory to store snapshot files during restoration") + fs.StringVar(&c.TempDir, "restoration-temp-dir", c.TempDir, "path to the temporary directory to store snapshot files during restoration") fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") From 0c40571ab9dd6d5a984d14342368374f61c0cc5c Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Tue, 4 Apr 2023 19:23:53 +0530 Subject: [PATCH 4/7] Revert change to handling of cases where no full snapshot is present --- pkg/snapshot/restorer/restorer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 4c61a795c..a837f61dd 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -155,7 +155,8 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. func (r *Restorer) restoreFromBaseSnapshot(ro brtypes.RestoreOptions) error { var err error if path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName) == "" { - return fmt.Errorf("base snapshot path not provided") + r.logger.Warnf("Base snapshot path not provided. Will do nothing.") + return nil } r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName)) cfg := etcdserver.ServerConfig{ From 9c34025ca14c28a82b597d95a961ab435e5280e1 Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Wed, 5 Apr 2023 08:41:30 +0530 Subject: [PATCH 5/7] Address review comments from @aaronfern --- pkg/snapshot/restorer/restorer.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index a837f61dd..82f6fca3a 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -796,31 +796,32 @@ func verifySnapshotRevision(clientKV client.KVCloser, snap *brtypes.Snapshot) er return nil } -// getDecompressedSnapshotReadCloser passes the given ReadCloser through the +// getNormalizedSnapshotReadCloser passes the given ReadCloser through the // snapshot decompressor if the snapshot is compressed using a compression policy. -// Also returns whether the snapshot was initially compressed or not, as well as +// If snapshot is not compressed, it returns the given ReadCloser as is. +// It also returns whether the snapshot was initially compressed or not, as well as // the compression policy used for compressing the snapshot. -func getDecompressedSnapshotReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) (io.ReadCloser, bool, string, error) { +func getNormalizedSnapshotReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) (io.ReadCloser, bool, string, error) { isCompressed, compressionPolicy, err := compressor.IsSnapshotCompressed(snap.CompressionSuffix) if err != nil { - return nil, false, compressionPolicy, err + return rc, false, "", err } if isCompressed { // decompress the snapshot rc, err = compressor.DecompressSnapshot(rc, compressionPolicy) if err != nil { - return nil, true, compressionPolicy, fmt.Errorf("unable to decompress the snapshot: %v", err) + return rc, true, compressionPolicy, fmt.Errorf("unable to decompress the snapshot: %v", err) } } - return rc, true, compressionPolicy, nil + return rc, false, "", nil } func (r *Restorer) readSnapshotContentsFromReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) ([]byte, error) { startTime := time.Now() - rc, wasCompressed, compressionPolicy, err := getDecompressedSnapshotReadCloser(rc, snap) + rc, wasCompressed, compressionPolicy, err := getNormalizedSnapshotReadCloser(rc, snap) if err != nil { return nil, fmt.Errorf("failed to decompress delta snapshot %s : %v", snap.SnapName, err) } @@ -835,7 +836,7 @@ func (r *Restorer) readSnapshotContentsFromReadCloser(rc io.ReadCloser, snap *br if wasCompressed { r.logger.Infof("successfully decompressed data of delta snapshot in %v seconds [CompressionPolicy:%v]", totalTime, compressionPolicy) } else { - r.logger.Infof("successfully decompressed data of delta snapshot in %v seconds", totalTime) + r.logger.Infof("successfully read the data of delta snapshot in %v seconds", totalTime) } if bufSize <= sha256.Size { From a1b4c3bcbd0dc028d12c58235457af6740f972c8 Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Wed, 5 Apr 2023 12:43:11 +0530 Subject: [PATCH 6/7] Address review comments from @unmarshall and @aaronfern --- pkg/snapshot/restorer/restorer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 82f6fca3a..4718d0297 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -193,6 +193,7 @@ func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, sk if err != nil { return err } + defer rc.Close() startTime := time.Now() isCompressed, compressionPolicy, err := compressor.IsSnapshotCompressed(snap.CompressionSuffix) @@ -206,7 +207,6 @@ func (r *Restorer) makeDB(snapDir string, snap *brtypes.Snapshot, commit int, sk return fmt.Errorf("unable to decompress the snapshot: %v", err) } } - defer rc.Close() if err := os.MkdirAll(snapDir, 0700); err != nil { return err @@ -815,7 +815,7 @@ func getNormalizedSnapshotReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) ( } } - return rc, false, "", nil + return rc, isCompressed, compressionPolicy, nil } func (r *Restorer) readSnapshotContentsFromReadCloser(rc io.ReadCloser, snap *brtypes.Snapshot) ([]byte, error) { From 8f4e5855d8da6537fe4d0c4e8271e4e61ddfb66a Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Wed, 5 Apr 2023 14:29:46 +0530 Subject: [PATCH 7/7] Change CLI flag to `restoration-temp-snapshots-dir` and RestorationConfig field to `TempSnapshotsDir` --- pkg/compactor/compactor_test.go | 2 +- pkg/snapshot/restorer/restorer.go | 10 +++++----- pkg/snapshot/restorer/restorer_test.go | 6 +++--- pkg/snapshot/restorer/types_test.go | 2 +- pkg/types/restorer.go | 8 ++++---- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 2af498290..371c5894d 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -75,7 +75,7 @@ var _ = Describe("Running Compactor", func() { InitialCluster: restoreCluster, InitialClusterToken: restoreClusterToken, DataDir: tempDataDir, - TempDir: tempRestorationSnapshotsDir, + TempSnapshotsDir: tempRestorationSnapshotsDir, InitialAdvertisePeerURLs: restorePeerURLs, Name: restoreName, SkipHashCheck: skipHashCheck, diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 4718d0297..a7ccdb3ef 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -101,16 +101,16 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. } r.logger.Infof("Attempting to apply %d delta snapshots for restoration.", len(ro.DeltaSnapList)) - r.logger.Infof("Creating temporary directory %s for persisting delta snapshots locally.", ro.Config.TempDir) + r.logger.Infof("Creating temporary directory %s for persisting delta snapshots locally.", ro.Config.TempSnapshotsDir) - err := os.MkdirAll(ro.Config.TempDir, 0700) + err := os.MkdirAll(ro.Config.TempSnapshotsDir, 0700) if err != nil { return nil, err } defer func() { - if err := os.RemoveAll(ro.Config.TempDir); err != nil { - r.logger.Errorf("Failed to remove restoration temp directory %s: %v", ro.Config.TempDir, err) + if err := os.RemoveAll(ro.Config.TempSnapshotsDir); err != nil { + r.logger.Errorf("Failed to remove restoration temp directory %s: %v", ro.Config.TempSnapshotsDir, err) } }() @@ -441,7 +441,7 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest go r.applySnaps(clientKV, remainingSnaps, applierInfoCh, errCh, stopCh, &wg) for f := 0; f < numFetchers; f++ { - go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg, ro.Config.TempDir) + go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg, ro.Config.TempSnapshotsDir) } for i, remainingSnap := range remainingSnaps { diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 8baa4e820..66362d593 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -103,7 +103,7 @@ var _ = Describe("Running Restorer", func() { restoreOpts = brtypes.RestoreOptions{ Config: &brtypes.RestorationConfig{ DataDir: etcdDir, - TempDir: tempDir, + TempSnapshotsDir: tempDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, Name: restoreName, @@ -263,7 +263,7 @@ var _ = Describe("Running Restorer", func() { restorationConfig = &brtypes.RestorationConfig{ DataDir: etcdDir, - TempDir: tempDir, + TempSnapshotsDir: tempDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, Name: restoreName, @@ -735,7 +735,7 @@ var _ = Describe("Running Restorer when both v1 and v2 directory structures are restorationConfig = &brtypes.RestorationConfig{ DataDir: etcdDataDir, - TempDir: restoreTempDir, + TempSnapshotsDir: restoreTempDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, Name: restoreName, diff --git a/pkg/snapshot/restorer/types_test.go b/pkg/snapshot/restorer/types_test.go index 8654b3b22..3df92f60e 100644 --- a/pkg/snapshot/restorer/types_test.go +++ b/pkg/snapshot/restorer/types_test.go @@ -33,7 +33,7 @@ var _ = Describe("restorer types", func() { InitialCluster: s, InitialClusterToken: s, DataDir: s, - TempDir: s, + TempSnapshotsDir: s, InitialAdvertisePeerURLs: []string{s, s}, Name: s, SkipHashCheck: b, diff --git a/pkg/types/restorer.go b/pkg/types/restorer.go index b7c1cedcf..c9eb9d25b 100644 --- a/pkg/types/restorer.go +++ b/pkg/types/restorer.go @@ -62,7 +62,7 @@ type RestorationConfig struct { InitialCluster string `json:"initialCluster"` InitialClusterToken string `json:"initialClusterToken,omitempty"` DataDir string `json:"dataDir,omitempty"` - TempDir string `json:"tempDir,omitempty"` + TempSnapshotsDir string `json:"tempDir,omitempty"` InitialAdvertisePeerURLs []string `json:"initialAdvertisePeerURLs"` Name string `json:"name"` SkipHashCheck bool `json:"skipHashCheck,omitempty"` @@ -81,7 +81,7 @@ func NewRestorationConfig() *RestorationConfig { InitialCluster: initialClusterFromName(defaultName), InitialClusterToken: defaultInitialClusterToken, DataDir: fmt.Sprintf("%s.etcd", defaultName), - TempDir: fmt.Sprintf("%s.restore.tmp", defaultName), + TempSnapshotsDir: fmt.Sprintf("%s.restoration.tmp", defaultName), InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, Name: defaultName, SkipHashCheck: false, @@ -100,7 +100,7 @@ func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") fs.StringVarP(&c.DataDir, "data-dir", "d", c.DataDir, "path to the data directory") - fs.StringVar(&c.TempDir, "restoration-temp-dir", c.TempDir, "path to the temporary directory to store snapshot files during restoration") + fs.StringVar(&c.TempSnapshotsDir, "restoration-temp-snapshots-dir", c.TempSnapshotsDir, "path to the temporary directory to store snapshot files during restoration") fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") @@ -134,7 +134,7 @@ func (c *RestorationConfig) Validate() error { return fmt.Errorf("UnSupported auto-compaction-mode") } c.DataDir = path.Clean(c.DataDir) - c.TempDir = path.Clean(c.TempDir) + c.TempSnapshotsDir = path.Clean(c.TempSnapshotsDir) return nil }