From 4a91b59942b14e4de87d09cc6101e518fab4711f Mon Sep 17 00:00:00 2001 From: Rohit-Karki Date: Thu, 30 Nov 2023 00:53:08 +0545 Subject: [PATCH] Compaction of stale files half completed --- compact.go | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ header.go | 2 +- ops.go | 4 +-- 3 files changed, 102 insertions(+), 3 deletions(-) diff --git a/compact.go b/compact.go index 512237b..a10400b 100644 --- a/compact.go +++ b/compact.go @@ -1,2 +1,101 @@ package bitcaspy +import ( + "fmt" + "os" + "time" + + datafile "rohit.com/internal" +) + +// checkFileSize delegates to rotateDf checks the file size for the mac file size +// then places it into stale data files and creates a new data file +func (b *BitCaspy) checkFileSize(evalInterval time.Duration) { + var ( + evalTicker = time.NewTicker(evalInterval).C + ) + + for range evalTicker { + if err := b.rotateDf(); err != nil { + fmt.Errorf("failed to scan the active file: %v", err) + } + } +} + +// rotateDf checks the file size for the mac file size +// then places it into stale data files and creates a new data file +func (b *BitCaspy) rotateDf() error { + b.Lock() + defer b.Unlock() + + size, err := b.df.Size() + if err != nil { + return err + } + + // If smaller than threshold no action + if size < 20000 { + return nil + } + oldId := b.df.ID() + + b.stale[oldId] = b.df + newDf, err := datafile.New("rohit", oldId+1) + if err != nil { + return err + } + b.df = newDf + return nil +} + +// Compacts the old stale data files +func (b *BitCaspy) compaction(evalInterval time.Duration) { + var ( + evalTicker = time.NewTicker(evalInterval).C + ) + for range evalTicker { + b.Lock() + + if err := b.deleteIfExpired(); err != nil { + fmt.Errorf("failed to delete the expired keys: %v", err) + } + + // Merge the datafiles + if err := b.merge(); err != nil { + + } + + } +} + +func (b *BitCaspy) deleteIfExpired() error { + // Iterate over all keys and delete all keys which are expired. + keyDir := b.KeyDir + for k := range keyDir { + record, err := b.get(k) + if err != nil { + fmt.Errorf("error %v", err) + } + if record.isExpired() { + if err := b.delete(k); err != nil { + return err + } + } + } + return nil +} + +func (b *BitCaspy) merge() error { + // Create a new datafile for storing the output of merged files. + // Use a temp directory to store the file and move to main directory after merge is over. + tmpMergeDir, err := os.MkdirTemp("", "merged") + if err != nil { + return err + } + + newFile, err := datafile.New(tmpMergeDir, 0) + if err != nil { + fmt.Errorf("Error creating new datafile: %v", err) + } + +} diff --git a/header.go b/header.go index f6e0084..3ea56c4 100644 --- a/header.go +++ b/header.go @@ -34,7 +34,7 @@ func (r *Record) isExpired() bool { if r.Header.expiry == 0 { return false } - return int64(r.Header.expiry) < time.Now().Unix() + return int64(r.Header.expiry) < time.Now().Unix() } func (r *Record) isValidChecksum() bool { diff --git a/ops.go b/ops.go index ef0c9ea..39dd533 100644 --- a/ops.go +++ b/ops.go @@ -98,8 +98,8 @@ func (b *BitCaspy) put(df *datafile.DataFile, Key string, Value []byte, expiryTi return nil } -func (b *BitCaspy) delete(df *datafile.DataFile, Key string) error { - if err := b.put(df, Key, nil, nil); err != nil { +func (b *BitCaspy) delete(Key string) error { + if err := b.put(b.df, Key, nil, nil); err != nil { return fmt.Errorf("Error deleting the key: %v", err) } delete(b.KeyDir, Key)