diff --git a/Bitcaspy.go b/Bitcaspy.go index 14fe4b5..4efafa0 100644 --- a/Bitcaspy.go +++ b/Bitcaspy.go @@ -1,7 +1,10 @@ package bitcaspy import ( + "bytes" + "fmt" "os" + "path/filepath" "sync" "github.com/zerodha/logf" @@ -18,11 +21,12 @@ type BitCaspy struct { lo logf.Logger bufPool sync.Pool + opts *Options - KeyDir KeyDir - df *datafile.DataFile - stale map[int]*datafile.DataFile - flockF *os.File + KeyDir KeyDir // Hashmap of keys and location of the value for lookup + df *datafile.DataFile // Active Data file where put operation is performed + stale map[int]*datafile.DataFile // stale is the hashmap of fileId and datafile which arenot currently active for put operation + flockF *os.File // Lock for performing file lock } func initLogger(debug bool) logf.Logger { @@ -33,13 +37,84 @@ func initLogger(debug bool) logf.Logger { return logf.New(opts) } -func Init() (*BitCaspy, error) { +func Init(cfg ...Config) (*BitCaspy, error) { + // Set options + opts := DefaultOptions() + + for _, opt := range cfg { + if err := opt(opts); err != nil { + + } + } var ( - index = 0 + index = 0 + lo = initLogger(opts.debug) flockF *os.File stale = map[int]*datafile.DataFile{} ) // load existing data files - // file,err := -} \ No newline at end of file + datafiles, err := getDataFiles(opts.dir) + + if err != nil { + return nil, fmt.Errorf("error parsing ids for existing files: %w", err) + } + if len(datafiles) > 0 { + ids, err := getIds(datafiles) + + if err != nil { + return nil, fmt.Errorf("error getting existing ids for %s: %w", datafiles, err) + } + + index = len(ids) + + // Create a in memory datafile from the existing disk datafile + for _, id := range ids { + df, err := datafile.New(opts.dir, id) + if err != nil { + return nil, fmt.Errorf("Error creating datafile: %v", err) + } + stale[id] = df + } + } + + // Create a new active datafile + df, err := datafile.New(opts.dir, index) + if err != nil { + fmt.Errorf("error creating new datafile: %v", err) + } + + // Create a empty keyDirectory + KeyDir := make(KeyDir, 0) + + //Check if there is a hint file which we will decode and put those hashmap from hint files to keydir + hintPath := filepath.Join(opts.dir, HINTS_FILE) + if exists(hintPath) { + if err := KeyDir.Decode(hintPath); err != nil { + return nil, fmt.Errorf("failed to decode hint file %s: %v", hintPath, err) + } + } + + BitCaspy := &BitCaspy{ + lo: lo, + bufPool: sync.Pool{New: func() any { + return bytes.NewBuffer([]byte{}) + }}, + opts: opts, + + KeyDir: KeyDir, + df: df, + stale: stale, + flockF: flockF, + } + + go BitCaspy.runCompaction(BitCaspy.opts.compactInterval) + + go BitCaspy.checkFileSize(BitCaspy.opts.checkFileSizeInterval) + + // if BitCaspy.opts.syncInterval != nil{ + // go BitCaspy + // } + + return BitCaspy, nil +} diff --git a/KeyDir.go b/KeyDir.go index 639d78b..516b119 100644 --- a/KeyDir.go +++ b/KeyDir.go @@ -8,10 +8,10 @@ import ( type KeyDir map[string]Meta type Meta struct { - id int - value_sz int + id int + value_sz int value_pos int - tstamp int + tstamp int } func (k *KeyDir) Encode(fPath string) error { @@ -27,10 +27,10 @@ func (k *KeyDir) Encode(fPath string) error { return err } - return nil + return nil } -func (k *KeyDir) Decode(fPath string) (error) { +func (k *KeyDir) Decode(fPath string) error { file, err := os.Create(fPath) if err != nil { return err @@ -43,6 +43,5 @@ func (k *KeyDir) Decode(fPath string) (error) { return err } - return nil - -} \ No newline at end of file + return nil +} diff --git a/compact.go b/compact.go index a10400b..1c105cc 100644 --- a/compact.go +++ b/compact.go @@ -3,6 +3,7 @@ package bitcaspy import ( "fmt" "os" + "path/filepath" "time" datafile "rohit.com/internal" @@ -22,6 +23,27 @@ func (b *BitCaspy) checkFileSize(evalInterval time.Duration) { } } +// RunCompaction runs cleanup process to compact the keys and cleanup +// dead/expired keys at a periodic interval. This helps to save disk space +// and merge old inactive db files in a single file. It also generates a hints file +// which helps in caching all the keys during a cold start. +func (b *BitCaspy) runCompaction(evalInterval time.Duration) { + var ( + evalTicker = time.NewTicker(evalInterval).C + ) + for range evalTicker { + if err := b.deleteIfExpired(); err != nil { + fmt.Errorf("Error deleting expired datafiles %v", err) + } + if err := b.merge(); err != nil { + fmt.Errorf("Error merging stale datafiles %v", err) + } + if err := b.genrateHintFiles(); err != nil { + fmt.Errorf("Error genrating hint file %v", err) + } + } +} + // rotateDf checks the file size for the mac file size // then places it into stale data files and creates a new data file func (b *BitCaspy) rotateDf() error { @@ -40,7 +62,7 @@ func (b *BitCaspy) rotateDf() error { oldId := b.df.ID() b.stale[oldId] = b.df - newDf, err := datafile.New("rohit", oldId+1) + newDf, err := datafile.New(b.opts.dir, oldId+1) if err != nil { return err } @@ -48,24 +70,15 @@ func (b *BitCaspy) rotateDf() error { return nil } -// Compacts the old stale data files -func (b *BitCaspy) compaction(evalInterval time.Duration) { - var ( - evalTicker = time.NewTicker(evalInterval).C - ) - for range evalTicker { - b.Lock() - - if err := b.deleteIfExpired(); err != nil { - fmt.Errorf("failed to delete the expired keys: %v", err) - } - - // Merge the datafiles - if err := b.merge(); err != nil { - - } +// Encode the keyDir hashmap into gob +func (b *BitCaspy) genrateHintFiles() error { + hintFile := filepath.Join(b.opts.dir, HINTS_FILE) + err := b.KeyDir.Encode(hintFile) + if err != nil { + return err } + return nil } func (b *BitCaspy) deleteIfExpired() error { @@ -86,6 +99,10 @@ func (b *BitCaspy) deleteIfExpired() error { } func (b *BitCaspy) merge() error { + // Only merge when stale datafiles are more than 2 + if len(b.stale) < 2 { + return nil + } // Create a new datafile for storing the output of merged files. // Use a temp directory to store the file and move to main directory after merge is over. tmpMergeDir, err := os.MkdirTemp("", "merged") @@ -93,9 +110,61 @@ func (b *BitCaspy) merge() error { return err } + defer os.RemoveAll(tmpMergeDir) newFile, err := datafile.New(tmpMergeDir, 0) if err != nil { fmt.Errorf("Error creating new datafile: %v", err) } + // Loop over all the active keys from the keydir and + // Since the keydir has updated values of all keys, all the old keys which are expired/deleted/overwritten + // will be cleaned up in the merged database. + + for k := range b.KeyDir { + record, err := b.get(k) + if err != nil { + return err + } + + if err := b.put(newFile, k, record.Value, nil); err != nil { + return err + } + } + + // close all the stale datafiles because all the stale datafiles are merged into new datafile + for _, df := range b.stale { + if err := df.Close(); err != nil { + return err + } + } + + // Reset the stale hashmap to none because all the stale datafiles are merged into new datafile + b.stale = make(map[int]*datafile.DataFile, 0) + + // Delete all old .db datafiles + err = filepath.Walk(b.opts.dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + + if filepath.Ext(path) == ".db" { + err := os.Remove(path) + if err != nil { + return err + } + } + return nil + }) + + // Move the new merged datafile to the old database directory and delete all old datafiles + os.Rename(filepath.Join(tmpMergeDir, fmt.Sprintf(datafile.ACTIVE_DATAFILE, 0)), + filepath.Join(b.opts.dir, fmt.Sprintf(datafile.ACTIVE_DATAFILE, 0))) + + b.df = newFile + + b.df.Sync() + return nil } diff --git a/config.go b/config.go new file mode 100644 index 0000000..f0d796d --- /dev/null +++ b/config.go @@ -0,0 +1,57 @@ +package bitcaspy + +import "time" + +const ( + defaultSyncInterval = time.Minute * 1 + defaultCompactInterval = time.Hour * 6 + defaultFileSizeInterval = time.Minute * 1 + defaultMaxActiveFileSize = int64(1 << 32) // 4GB. +) + +// Options represents configuration options for managing a datastore. +type Options struct { + debug bool // Enable debug logging. + dir string // Path for storing data files. + readOnly bool // Whether this datastore should be opened in a read-only mode. Only one process at a time can open it in R-W mode. + alwaysFSync bool // Should flush filesystem buffer after every right. + syncInterval *time.Duration // Interval to sync the active file on disk. + compactInterval time.Duration // Interval to compact old files. + checkFileSizeInterval time.Duration // Interval to check the file size of the active DB. + maxActiveFileSize int64 // Max size of active file in bytes. On exceeding this size it's rotated. +} + +func DefaultOptions() *Options { + return &Options{ + debug: false, + dir: ".", + readOnly: false, + alwaysFSync: false, + maxActiveFileSize: defaultMaxActiveFileSize, + compactInterval: defaultCompactInterval, + checkFileSizeInterval: defaultFileSizeInterval, + } +} + +type Config func(*Options) error + +func WithDebug() Config { + return func(o *Options) error { + o.debug = true + return nil + } +} + +func WithAlwaysSync() Config { + return func(o *Options) error { + o.alwaysFSync = true + return nil + } +} + +func WithReadOnly() Config { + return func (o *Options) error { + o.readOnly = true + return nil + } +} \ No newline at end of file diff --git a/merge.go b/merge.go deleted file mode 100644 index f9092ae..0000000 --- a/merge.go +++ /dev/null @@ -1 +0,0 @@ -package bitcaspy diff --git a/ops.go b/ops.go index 39dd533..d18efc2 100644 --- a/ops.go +++ b/ops.go @@ -81,7 +81,7 @@ func (b *BitCaspy) put(df *datafile.DataFile, Key string, Value []byte, expiryTi return fmt.Errorf("Error writing the Record to the data file: %v", err) } - // Creating the meta objec of the keydir + // Creating the meta object of the keydir meta := Meta{ id: df.ID(), value_sz: len(Value), diff --git a/utils.go b/utils.go index f2a63de..f870077 100644 --- a/utils.go +++ b/utils.go @@ -19,7 +19,7 @@ func exists(path string) bool { } // returns the list of files in the database directory -func getDatFiles(outDir string) ([]string, error) { +func getDataFiles(outDir string) ([]string, error) { if !exists(outDir) { return nil, fmt.Errorf("Error finding the file %s", outDir) }