Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and optimize restoration of delta snapshots #609

Merged
merged 7 commits into from
Apr 5, 2023
1 change: 1 addition & 0 deletions .ci/unit_test
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ if [ -z $COVER ] || [ "$COVER" = false ] ; then
ginkgo -race -trace $GINKGO_COMMON_FLAGS -gcflags=all=-d=checkptr=0 --randomizeAllSpecs --randomizeSuites --failOnPending --skip="NEGATIVE\:.*" ${TEST_PACKAGES}

if [ "$RUN_NEGATIVE" = "true" ] ; then
echo "[INFO] Running negative tests now..."
#run negative scenario in a sequenced manner (removed failOnPending as one spec in restore test is marked as 'X' for excluding)
ginkgo -race -trace $GINKGO_COMMON_FLAGS -gcflags=all=-d=checkptr=0 --focus="NEGATIVE\:.*" ${TEST_PACKAGES}
fi
Expand Down
44 changes: 21 additions & 23 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,38 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
cp.logger.Info("Start compacting")

// Deepcopy restoration options ro to avoid any mutation of the passing object
cmpctOptions := opts.RestoreOptions.DeepCopy()
compactorRestoreOptions := opts.RestoreOptions.DeepCopy()

// If no basesnapshot is found, abort compaction as there would be nothing to compact
if cmpctOptions.BaseSnapshot == nil {
// If no base snapshot is found, abort compaction as there would be nothing to compact
if compactorRestoreOptions.BaseSnapshot == nil {
shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
cp.logger.Error("No base snapshot found. Nothing is available for compaction")
return nil, fmt.Errorf("no base snapshot found. Nothing is available for compaction")
}

// Set a temporary etcd data directory for embedded etcd
prefix := cmpctOptions.Config.RestoreDataDir
if prefix == "" {
prefix = "/tmp"
}
cmpctDir, err := os.MkdirTemp(prefix, "compactor-")
cp.logger.Infof("Creating temporary etcd directory %s for restoration.", compactorRestoreOptions.Config.DataDir)
err := os.MkdirAll(compactorRestoreOptions.Config.DataDir, 0700)
if err != nil {
cp.logger.Errorf("Unable to create temporary etcd directory for compaction: %s", err.Error())
return nil, err
}

defer os.RemoveAll(cmpctDir)

cmpctOptions.Config.RestoreDataDir = cmpctDir
defer func() {
if err := os.RemoveAll(compactorRestoreOptions.Config.DataDir); err != nil {
cp.logger.Errorf("Failed to remove temporary etcd directory %s: %v", compactorRestoreOptions.Config.DataDir, err)
}
}()

// Then restore from the snapshots
r := restorer.NewRestorer(cp.store, cp.logger)
embeddedEtcd, err := r.Restore(*cmpctOptions, nil)
embeddedEtcd, err := r.Restore(*compactorRestoreOptions, nil)
if err != nil {
return nil, fmt.Errorf("unable to restore snapshots during compaction: %v", err)
}

cp.logger.Info("Restoration for compaction is over")
cp.logger.Info("Restoration for compaction is done.")
// There is a possibility that restore operation may not start an embedded ETCD.
if embeddedEtcd == nil {
embeddedEtcd, err = miscellaneous.StartEmbeddedEtcd(cp.logger, cmpctOptions)
embeddedEtcd, err = miscellaneous.StartEmbeddedEtcd(cp.logger, compactorRestoreOptions)
unmarshall marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -104,8 +102,8 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
// Then compact ETCD

// Build Client
clientFactory := etcdutil.NewClientFactory(cmpctOptions.NewClientFactory, brtypes.EtcdConnectionConfig{
MaxCallSendMsgSize: cmpctOptions.Config.MaxCallSendMsgSize,
clientFactory := etcdutil.NewClientFactory(compactorRestoreOptions.NewClientFactory, brtypes.EtcdConnectionConfig{
MaxCallSendMsgSize: compactorRestoreOptions.Config.MaxCallSendMsgSize,
Endpoints: ep,
InsecureTransport: true,
})
Expand Down Expand Up @@ -150,22 +148,22 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
}
}

// Then take snapeshot of ETCD
// Then take snapshot of ETCD
snapshotReqCtx, cancel := context.WithTimeout(ctx, opts.SnapshotTimeout.Duration)
defer cancel()

// Determine suffix of compacted snapshot that will be result of this compaction
suffix := cmpctOptions.BaseSnapshot.CompressionSuffix
if len(cmpctOptions.DeltaSnapList) > 0 {
suffix = cmpctOptions.DeltaSnapList[cmpctOptions.DeltaSnapList.Len()-1].CompressionSuffix
suffix := compactorRestoreOptions.BaseSnapshot.CompressionSuffix
if len(compactorRestoreOptions.DeltaSnapList) > 0 {
suffix = compactorRestoreOptions.DeltaSnapList[compactorRestoreOptions.DeltaSnapList.Len()-1].CompressionSuffix
}

isCompressed, compressionPolicy, err := compressor.IsSnapshotCompressed(suffix)
unmarshall marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("unable to determine if snapshot is compressed: %v", cmpctOptions.BaseSnapshot.CompressionSuffix)
return nil, fmt.Errorf("unable to determine if snapshot is compressed: %v", compactorRestoreOptions.BaseSnapshot.CompressionSuffix)
}

isFinal := cmpctOptions.BaseSnapshot.IsFinal
isFinal := compactorRestoreOptions.BaseSnapshot.IsFinal

cc := &compressor.CompressionConfig{Enabled: isCompressed, CompressionPolicy: compressionPolicy}
snapshot, err := etcdutil.TakeAndSaveFullSnapshot(snapshotReqCtx, clientMaintenance, cp.store, etcdRevision, cc, suffix, isFinal, cp.logger)
Expand Down
26 changes: 13 additions & 13 deletions pkg/compactor/compactor_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ const (
)

var (
testSuitDir, testEtcdDir, testSnapshotDir string
testCtx = context.Background()
logger = logrus.New().WithField("suite", "compactor")
etcd *embed.Etcd
err error
keyTo int
endpoints []string
testSuiteDir, testEtcdDir, testSnapshotDir string
testCtx = context.Background()
logger = logrus.New().WithField("suite", "compactor")
etcd *embed.Etcd
err error
keyTo int
endpoints []string
)

func TestCompactor(t *testing.T) {
Expand All @@ -56,13 +56,13 @@ var _ = SynchronizedBeforeSuite(func() []byte {
)

// Create a directory for compaction test cases
testSuitDir, err = os.MkdirTemp("/tmp", "compactor-test")
testSuiteDir, err = os.MkdirTemp("/tmp", "compactor-test-")
Expect(err).ShouldNot(HaveOccurred())

// Directory for the main ETCD process
testEtcdDir := fmt.Sprintf("%s/etcd/default.etcd", testSuitDir)
testEtcdDir := fmt.Sprintf("%s/etcd/default.etcd", testSuiteDir)
// Directory for storing the backups
testSnapshotDir := fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir)
testSnapshotDir := fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuiteDir)

logger.Infof("ETCD Directory is: %s", testEtcdDir)
logger.Infof("Snapshot Directory is: %s", testSnapshotDir)
Expand Down Expand Up @@ -91,7 +91,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig)
Expect(err).ShouldNot(HaveOccurred())

// Wait unitil the populator finishes with populating ETCD
// Wait until the populator finishes with populating ETCD
wg.Wait()

keyTo = resp.KeyTo
Expand All @@ -106,7 +106,7 @@ func cleanUp() {
etcd.Server.Stop()
etcd.Close()

logger.Infof("All tests are done for compactor suite. %s is being removed.", testSuitDir)
err = os.RemoveAll(testSuitDir)
logger.Infof("All tests are done for compactor suite. %s is being removed.", testSuiteDir)
err = os.RemoveAll(testSuiteDir)
Expect(err).ShouldNot(HaveOccurred())
}
54 changes: 33 additions & 21 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,27 @@ var _ = Describe("Running Compactor", func() {
var compactorConfig *brtypes.CompactorConfig
var compactOptions *brtypes.CompactOptions
var compactedSnapshot *brtypes.Snapshot
var restoreDir string
var tempRestoreDir string

BeforeEach(func() {
dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir)
dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuiteDir)
store, err = snapstore.GetSnapstore(&brtypes.SnapstoreConfig{Container: dir, Provider: "Local"})
Expect(err).ShouldNot(HaveOccurred())
fmt.Println("The store where compaction will save snapshot is: ", store)

tempDataDir, err := os.MkdirTemp(testSuiteDir, "compacted.etcd-")
Expect(err).ShouldNot(HaveOccurred())

tempRestorationSnapshotsDir, err := os.MkdirTemp(testSuiteDir, "temp-snapshots-")
Expect(err).ShouldNot(HaveOccurred())

cptr = compactor.NewCompactor(store, logger, nil)
restoreOpts = &brtypes.RestoreOptions{
Config: &brtypes.RestorationConfig{
InitialCluster: restoreCluster,
InitialClusterToken: restoreClusterToken,
RestoreDataDir: "/tmp",
DataDir: tempDataDir,
TempDir: tempRestorationSnapshotsDir,
InitialAdvertisePeerURLs: restorePeerURLs,
Name: restoreName,
SkipHashCheck: skipHashCheck,
Expand All @@ -93,26 +100,26 @@ var _ = Describe("Running Compactor", func() {
}
})

Context("with defragmention allowed", func() {
Context("with defragmentation allowed", func() {
AfterEach(func() {
_, err := os.Stat(restoreDir)
_, err := os.Stat(tempRestoreDir)
if err == nil {
os.RemoveAll(restoreDir)
os.RemoveAll(tempRestoreDir)
}
store.Delete(*compactedSnapshot)
})

It("should create a snapshot", func() {
restoreOpts.Config.MaxFetchers = 4

// Fetch latest snapshots
// Fetch the latest set of snapshots
baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
Expect(err).ShouldNot(HaveOccurred())

restoreOpts.BaseSnapshot = baseSnapshot
restoreOpts.DeltaSnapList = deltaSnapList

// Take the compacted full snapshot with defragmnetation allowed
// Take the compacted full snapshot with defragmentation allowed
_, err = cptr.Compact(testCtx, compactOptions)
Expect(err).ShouldNot(HaveOccurred())

Expand All @@ -130,7 +137,7 @@ var _ = Describe("Running Compactor", func() {
It("should restore from compacted snapshot", func() {
restoreOpts.Config.MaxFetchers = 4

// Fetch latest snapshots
// Fetch the latest set of snapshots
baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
Expect(err).ShouldNot(HaveOccurred())

Expand All @@ -151,10 +158,15 @@ var _ = Describe("Running Compactor", func() {
Expect(size).ShouldNot(BeZero())

// Restore from the compacted snapshot
restoreDir, err = os.MkdirTemp("/tmp", "restore-")
tempRestoreDir, err = os.MkdirTemp(testSuiteDir, "restore-test-")
Expect(err).ShouldNot(HaveOccurred())

restoreOpts.Config.RestoreDataDir = restoreDir
defer func() {
err := os.RemoveAll(tempRestoreDir)
Expect(err).ShouldNot(HaveOccurred())
}()

restoreOpts.Config.DataDir = tempRestoreDir

restoreOpts.BaseSnapshot = compactedSnapshot
restoreOpts.DeltaSnapList = deltaSnapList
Expand All @@ -164,22 +176,22 @@ var _ = Describe("Running Compactor", func() {
err = rstr.RestoreAndStopEtcd(*restoreOpts, nil)

Expect(err).ShouldNot(HaveOccurred())
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger)
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger)
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("with defragmention not allowed", func() {
Context("with defragmentation not allowed", func() {
AfterEach(func() {
_, err := os.Stat(restoreDir)
_, err := os.Stat(tempRestoreDir)
if err != nil {
os.RemoveAll(restoreDir)
os.RemoveAll(tempRestoreDir)
}
store.Delete(*compactedSnapshot)
})
It("should create a snapshot", func() {
restoreOpts.Config.MaxFetchers = 4

// Fetch latest snapshots
// Fetch the latest set of snapshots
baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
Expect(err).ShouldNot(HaveOccurred())

Expand All @@ -205,7 +217,7 @@ var _ = Describe("Running Compactor", func() {
It("should restore from compacted snapshot", func() {
restoreOpts.Config.MaxFetchers = 4

// Fetch latest snapshots
// Fetch the latest set of snapshots
baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
Expect(err).ShouldNot(HaveOccurred())

Expand All @@ -227,10 +239,10 @@ var _ = Describe("Running Compactor", func() {
Expect(size).ShouldNot(BeZero())

// Restore from the compacted snapshot
restoreDir, err = os.MkdirTemp("/tmp", "restore-")
tempRestoreDir, err = os.MkdirTemp(testSuiteDir, "restore-test-")
Expect(err).ShouldNot(HaveOccurred())

restoreOpts.Config.RestoreDataDir = restoreDir
restoreOpts.Config.DataDir = tempRestoreDir

restoreOpts.BaseSnapshot = compactedSnapshot
restoreOpts.DeltaSnapList = deltaSnapList
Expand All @@ -240,11 +252,11 @@ var _ = Describe("Running Compactor", func() {
err = rstr.RestoreAndStopEtcd(*restoreOpts, nil)

Expect(err).ShouldNot(HaveOccurred())
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger)
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger)
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("with no basesnapshot in backup store", func() {
Context("with no base snapshot in backup store", func() {
It("should not run compaction", func() {
restoreOpts.Config.MaxFetchers = 4

Expand Down
20 changes: 10 additions & 10 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6
}

// NewInitializer creates an etcd initializer object.
func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.SnapstoreConfig, etcdConnectionConfig *brtypes.EtcdConnectionConfig, logger *logrus.Logger) *EtcdInitializer {
func NewInitializer(restoreOptions *brtypes.RestoreOptions, snapstoreConfig *brtypes.SnapstoreConfig, etcdConnectionConfig *brtypes.EtcdConnectionConfig, logger *logrus.Logger) *EtcdInitializer {
zapLogger, _ := zap.NewProduction()
etcdInit := &EtcdInitializer{
Config: &Config{
SnapstoreConfig: snapstoreConfig,
RestoreOptions: options,
RestoreOptions: restoreOptions,
EtcdConnectionConfig: etcdConnectionConfig,
},
Validator: &validator.DataValidator{
Config: &validator.Config{
DataDir: options.Config.RestoreDataDir,
EmbeddedEtcdQuotaBytes: options.Config.EmbeddedEtcdQuotaBytes,
DataDir: restoreOptions.Config.DataDir,
EmbeddedEtcdQuotaBytes: restoreOptions.Config.EmbeddedEtcdQuotaBytes,
SnapstoreConfig: snapstoreConfig,
},
OriginalClusterSize: options.OriginalClusterSize,
OriginalClusterSize: restoreOptions.OriginalClusterSize,
Logger: logger,
ZapLogger: zapLogger,
},
Expand All @@ -144,7 +144,7 @@ func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *brtypes.Sn
func (e *EtcdInitializer) restoreCorruptData() (bool, error) {
logger := e.Logger
tempRestoreOptions := *(e.Config.RestoreOptions.DeepCopy())
dataDir := tempRestoreOptions.Config.RestoreDataDir
dataDir := tempRestoreOptions.Config.DataDir

if e.Config.SnapstoreConfig == nil || len(e.Config.SnapstoreConfig.Provider) == 0 {
logger.Warnf("No snapstore storage provider configured.")
Expand All @@ -170,9 +170,9 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) {

tempRestoreOptions.BaseSnapshot = baseSnap
tempRestoreOptions.DeltaSnapList = deltaSnapList
tempRestoreOptions.Config.RestoreDataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.RestoreDataDir, "part")
tempRestoreOptions.Config.DataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.DataDir, "part")

if err := e.removeDir(tempRestoreOptions.Config.RestoreDataDir); err != nil {
if err := e.removeDir(tempRestoreOptions.Config.DataDir); err != nil {
return false, fmt.Errorf("failed to delete previous temporary data directory: %v", err)
}

Expand All @@ -196,7 +196,7 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) {
// and false if directory removal failed or if directory
// never existed (bootstrap case)
func (e *EtcdInitializer) restoreWithEmptySnapstore() (bool, error) {
dataDir := e.Config.RestoreOptions.Config.RestoreDataDir
dataDir := e.Config.RestoreOptions.Config.DataDir
e.Logger.Infof("Removing directory(%s) since snapstore is empty.", dataDir)

// If data directory doesn't exist, it means we are bootstrapping
Expand Down Expand Up @@ -248,7 +248,7 @@ func (e *EtcdInitializer) restoreInMultiNode(ctx context.Context) error {
return fmt.Errorf("unable to remove the member %v", err)
}

if err := e.removeDir(e.Config.RestoreOptions.Config.RestoreDataDir); err != nil {
if err := e.removeDir(e.Config.RestoreOptions.Config.DataDir); err != nil {
return fmt.Errorf("unable to remove the data-dir %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnaps
d.Logger.Info("Starting embedded etcd server...")
ro := &brtypes.RestoreOptions{
Config: &brtypes.RestorationConfig{
RestoreDataDir: dataDir,
DataDir: dataDir,
EmbeddedEtcdQuotaBytes: d.Config.EmbeddedEtcdQuotaBytes,
MaxRequestBytes: defaultMaxRequestBytes,
MaxTxnOps: defaultMaxTxnOps,
Expand Down
2 changes: 1 addition & 1 deletion pkg/miscellaneous/miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func getStructuredBackupList(snapList brtypes.SnapList) []backup {
// StartEmbeddedEtcd starts the embedded etcd server.
func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*embed.Etcd, error) {
cfg := embed.NewConfig()
cfg.Dir = filepath.Join(ro.Config.RestoreDataDir)
cfg.Dir = filepath.Join(ro.Config.DataDir)
DefaultListenPeerURLs := "http://localhost:0"
DefaultListenClientURLs := "http://localhost:0"
DefaultInitialAdvertisePeerURLs := "http://localhost:0"
Expand Down
Loading