Skip to content

Commit

Permalink
Compaction of stale files half completed
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit-Karki committed Nov 29, 2023
1 parent fac23ed commit 4a91b59
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 3 deletions.
99 changes: 99 additions & 0 deletions compact.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,101 @@
package bitcaspy

import (
"fmt"
"os"
"time"

datafile "rohit.com/internal"
)

// checkFileSize delegates to rotateDf checks the file size for the mac file size
// then places it into stale data files and creates a new data file
func (b *BitCaspy) checkFileSize(evalInterval time.Duration) {
var (
evalTicker = time.NewTicker(evalInterval).C
)

for range evalTicker {
if err := b.rotateDf(); err != nil {
fmt.Errorf("failed to scan the active file: %v", err)
}
}
}

// rotateDf checks the file size for the mac file size
// then places it into stale data files and creates a new data file
func (b *BitCaspy) rotateDf() error {
b.Lock()
defer b.Unlock()

size, err := b.df.Size()
if err != nil {
return err
}

// If smaller than threshold no action
if size < 20000 {
return nil
}
oldId := b.df.ID()

b.stale[oldId] = b.df
newDf, err := datafile.New("rohit", oldId+1)
if err != nil {
return err
}
b.df = newDf
return nil
}

// Compacts the old stale data files
func (b *BitCaspy) compaction(evalInterval time.Duration) {
var (
evalTicker = time.NewTicker(evalInterval).C
)
for range evalTicker {
b.Lock()

if err := b.deleteIfExpired(); err != nil {
fmt.Errorf("failed to delete the expired keys: %v", err)
}

// Merge the datafiles
if err := b.merge(); err != nil {

}

}
}

func (b *BitCaspy) deleteIfExpired() error {
// Iterate over all keys and delete all keys which are expired.
keyDir := b.KeyDir
for k := range keyDir {
record, err := b.get(k)
if err != nil {
fmt.Errorf("error %v", err)
}
if record.isExpired() {
if err := b.delete(k); err != nil {
return err
}
}
}
return nil
}

func (b *BitCaspy) merge() error {
// Create a new datafile for storing the output of merged files.
// Use a temp directory to store the file and move to main directory after merge is over.
tmpMergeDir, err := os.MkdirTemp("", "merged")
if err != nil {
return err
}

newFile, err := datafile.New(tmpMergeDir, 0)
if err != nil {
fmt.Errorf("Error creating new datafile: %v", err)
}

}
2 changes: 1 addition & 1 deletion header.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (r *Record) isExpired() bool {
if r.Header.expiry == 0 {
return false
}
return int64(r.Header.expiry) < time.Now().Unix()
return int64(r.Header.expiry) < time.Now().Unix()
}

func (r *Record) isValidChecksum() bool {
Expand Down
4 changes: 2 additions & 2 deletions ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (b *BitCaspy) put(df *datafile.DataFile, Key string, Value []byte, expiryTi
return nil
}

func (b *BitCaspy) delete(df *datafile.DataFile, Key string) error {
if err := b.put(df, Key, nil, nil); err != nil {
func (b *BitCaspy) delete(Key string) error {
if err := b.put(b.df, Key, nil, nil); err != nil {
return fmt.Errorf("Error deleting the key: %v", err)
}
delete(b.KeyDir, Key)
Expand Down

0 comments on commit 4a91b59

Please sign in to comment.