Skip to content

Commit

Permalink
Nearly Completed
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit-Karki committed Nov 30, 2023
1 parent 4a91b59 commit 8ccf383
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 36 deletions.
91 changes: 83 additions & 8 deletions Bitcaspy.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package bitcaspy

import (
"bytes"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/zerodha/logf"
Expand All @@ -18,11 +21,12 @@ type BitCaspy struct {

lo logf.Logger
bufPool sync.Pool
opts *Options

KeyDir KeyDir
df *datafile.DataFile
stale map[int]*datafile.DataFile
flockF *os.File
KeyDir KeyDir // Hashmap of keys and location of the value for lookup
df *datafile.DataFile // Active Data file where put operation is performed
stale map[int]*datafile.DataFile // stale is the hashmap of fileId and datafile which arenot currently active for put operation
flockF *os.File // Lock for performing file lock
}

func initLogger(debug bool) logf.Logger {
Expand All @@ -33,13 +37,84 @@ func initLogger(debug bool) logf.Logger {
return logf.New(opts)
}

func Init() (*BitCaspy, error) {
func Init(cfg ...Config) (*BitCaspy, error) {
// Set options
opts := DefaultOptions()

for _, opt := range cfg {
if err := opt(opts); err != nil {

}
}
var (
index = 0
index = 0
lo = initLogger(opts.debug)
flockF *os.File
stale = map[int]*datafile.DataFile{}
)

// load existing data files
// file,err :=
}
datafiles, err := getDataFiles(opts.dir)

if err != nil {
return nil, fmt.Errorf("error parsing ids for existing files: %w", err)
}
if len(datafiles) > 0 {
ids, err := getIds(datafiles)

if err != nil {
return nil, fmt.Errorf("error getting existing ids for %s: %w", datafiles, err)
}

index = len(ids)

// Create a in memory datafile from the existing disk datafile
for _, id := range ids {
df, err := datafile.New(opts.dir, id)
if err != nil {
return nil, fmt.Errorf("Error creating datafile: %v", err)
}
stale[id] = df
}
}

// Create a new active datafile
df, err := datafile.New(opts.dir, index)
if err != nil {
fmt.Errorf("error creating new datafile: %v", err)
}

// Create a empty keyDirectory
KeyDir := make(KeyDir, 0)

//Check if there is a hint file which we will decode and put those hashmap from hint files to keydir
hintPath := filepath.Join(opts.dir, HINTS_FILE)
if exists(hintPath) {
if err := KeyDir.Decode(hintPath); err != nil {
return nil, fmt.Errorf("failed to decode hint file %s: %v", hintPath, err)
}
}

BitCaspy := &BitCaspy{
lo: lo,
bufPool: sync.Pool{New: func() any {
return bytes.NewBuffer([]byte{})
}},
opts: opts,

KeyDir: KeyDir,
df: df,
stale: stale,
flockF: flockF,
}

go BitCaspy.runCompaction(BitCaspy.opts.compactInterval)

go BitCaspy.checkFileSize(BitCaspy.opts.checkFileSizeInterval)

// if BitCaspy.opts.syncInterval != nil{
// go BitCaspy
// }

return BitCaspy, nil
}
15 changes: 7 additions & 8 deletions KeyDir.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
type KeyDir map[string]Meta

type Meta struct {
id int
value_sz int
id int
value_sz int
value_pos int
tstamp int
tstamp int
}

func (k *KeyDir) Encode(fPath string) error {
Expand All @@ -27,10 +27,10 @@ func (k *KeyDir) Encode(fPath string) error {
return err
}

return nil
return nil
}

func (k *KeyDir) Decode(fPath string) (error) {
func (k *KeyDir) Decode(fPath string) error {
file, err := os.Create(fPath)
if err != nil {
return err
Expand All @@ -43,6 +43,5 @@ func (k *KeyDir) Decode(fPath string) (error) {
return err
}

return nil

}
return nil
}
103 changes: 86 additions & 17 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bitcaspy
import (
"fmt"
"os"
"path/filepath"
"time"

datafile "rohit.com/internal"
Expand All @@ -22,6 +23,27 @@ func (b *BitCaspy) checkFileSize(evalInterval time.Duration) {
}
}

// RunCompaction runs cleanup process to compact the keys and cleanup
// dead/expired keys at a periodic interval. This helps to save disk space
// and merge old inactive db files in a single file. It also generates a hints file
// which helps in caching all the keys during a cold start.
func (b *BitCaspy) runCompaction(evalInterval time.Duration) {
var (
evalTicker = time.NewTicker(evalInterval).C
)
for range evalTicker {
if err := b.deleteIfExpired(); err != nil {
fmt.Errorf("Error deleting expired datafiles %v", err)
}
if err := b.merge(); err != nil {
fmt.Errorf("Error merging stale datafiles %v", err)
}
if err := b.genrateHintFiles(); err != nil {
fmt.Errorf("Error genrating hint file %v", err)
}
}
}

// rotateDf checks the file size for the mac file size
// then places it into stale data files and creates a new data file
func (b *BitCaspy) rotateDf() error {
Expand All @@ -40,32 +62,23 @@ func (b *BitCaspy) rotateDf() error {
oldId := b.df.ID()

b.stale[oldId] = b.df
newDf, err := datafile.New("rohit", oldId+1)
newDf, err := datafile.New(b.opts.dir, oldId+1)
if err != nil {
return err
}
b.df = newDf
return nil
}

// Compacts the old stale data files
func (b *BitCaspy) compaction(evalInterval time.Duration) {
var (
evalTicker = time.NewTicker(evalInterval).C
)
for range evalTicker {
b.Lock()

if err := b.deleteIfExpired(); err != nil {
fmt.Errorf("failed to delete the expired keys: %v", err)
}

// Merge the datafiles
if err := b.merge(); err != nil {

}
// Encode the keyDir hashmap into gob
func (b *BitCaspy) genrateHintFiles() error {
hintFile := filepath.Join(b.opts.dir, HINTS_FILE)

err := b.KeyDir.Encode(hintFile)
if err != nil {
return err
}
return nil
}

func (b *BitCaspy) deleteIfExpired() error {
Expand All @@ -86,16 +99,72 @@ func (b *BitCaspy) deleteIfExpired() error {
}

func (b *BitCaspy) merge() error {
// Only merge when stale datafiles are more than 2
if len(b.stale) < 2 {
return nil
}
// Create a new datafile for storing the output of merged files.
// Use a temp directory to store the file and move to main directory after merge is over.
tmpMergeDir, err := os.MkdirTemp("", "merged")
if err != nil {
return err
}

defer os.RemoveAll(tmpMergeDir)
newFile, err := datafile.New(tmpMergeDir, 0)
if err != nil {
fmt.Errorf("Error creating new datafile: %v", err)
}

// Loop over all the active keys from the keydir and
// Since the keydir has updated values of all keys, all the old keys which are expired/deleted/overwritten
// will be cleaned up in the merged database.

for k := range b.KeyDir {
record, err := b.get(k)
if err != nil {
return err
}

if err := b.put(newFile, k, record.Value, nil); err != nil {
return err
}
}

// close all the stale datafiles because all the stale datafiles are merged into new datafile
for _, df := range b.stale {
if err := df.Close(); err != nil {
return err
}
}

// Reset the stale hashmap to none because all the stale datafiles are merged into new datafile
b.stale = make(map[int]*datafile.DataFile, 0)

// Delete all old .db datafiles
err = filepath.Walk(b.opts.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}

if filepath.Ext(path) == ".db" {
err := os.Remove(path)
if err != nil {
return err
}
}
return nil
})

// Move the new merged datafile to the old database directory and delete all old datafiles
os.Rename(filepath.Join(tmpMergeDir, fmt.Sprintf(datafile.ACTIVE_DATAFILE, 0)),
filepath.Join(b.opts.dir, fmt.Sprintf(datafile.ACTIVE_DATAFILE, 0)))

b.df = newFile

b.df.Sync()
return nil
}
57 changes: 57 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package bitcaspy

import "time"

const (
defaultSyncInterval = time.Minute * 1
defaultCompactInterval = time.Hour * 6
defaultFileSizeInterval = time.Minute * 1
defaultMaxActiveFileSize = int64(1 << 32) // 4GB.
)

// Options represents configuration options for managing a datastore.
type Options struct {
debug bool // Enable debug logging.
dir string // Path for storing data files.
readOnly bool // Whether this datastore should be opened in a read-only mode. Only one process at a time can open it in R-W mode.
alwaysFSync bool // Should flush filesystem buffer after every right.
syncInterval *time.Duration // Interval to sync the active file on disk.
compactInterval time.Duration // Interval to compact old files.
checkFileSizeInterval time.Duration // Interval to check the file size of the active DB.
maxActiveFileSize int64 // Max size of active file in bytes. On exceeding this size it's rotated.
}

func DefaultOptions() *Options {
return &Options{
debug: false,
dir: ".",
readOnly: false,
alwaysFSync: false,
maxActiveFileSize: defaultMaxActiveFileSize,
compactInterval: defaultCompactInterval,
checkFileSizeInterval: defaultFileSizeInterval,
}
}

type Config func(*Options) error

func WithDebug() Config {
return func(o *Options) error {
o.debug = true
return nil
}
}

func WithAlwaysSync() Config {
return func(o *Options) error {
o.alwaysFSync = true
return nil
}
}

func WithReadOnly() Config {
return func (o *Options) error {
o.readOnly = true
return nil
}
}
1 change: 0 additions & 1 deletion merge.go

This file was deleted.

2 changes: 1 addition & 1 deletion ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (b *BitCaspy) put(df *datafile.DataFile, Key string, Value []byte, expiryTi
return fmt.Errorf("Error writing the Record to the data file: %v", err)
}

// Creating the meta objec of the keydir
// Creating the meta object of the keydir
meta := Meta{
id: df.ID(),
value_sz: len(Value),
Expand Down
2 changes: 1 addition & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func exists(path string) bool {
}

// returns the list of files in the database directory
func getDatFiles(outDir string) ([]string, error) {
func getDataFiles(outDir string) ([]string, error) {
if !exists(outDir) {
return nil, fmt.Errorf("Error finding the file %s", outDir)
}
Expand Down

0 comments on commit 8ccf383

Please sign in to comment.