diff --git a/disktail/disktail.go b/disktail/disktail.go index 3d15404..269a47b 100644 --- a/disktail/disktail.go +++ b/disktail/disktail.go @@ -129,11 +129,12 @@ func NewDiskTail[T any](brokers []string, topic string, codec gtyped.GCodec[T], cache := pebble.NewCache(500 << 20) // << 20 shifts to MB store, err := pebble.Open(path, &pebble.Options{ - DisableWAL: true, - Cache: cache, - TableCache: pebble.NewTableCache(cache, 20, 100), - MemTableSize: uint64(100 << 20), // 100mb memtable to avoid too many compactions - MaxConcurrentCompactions: func() int { return 3 }, + DisableWAL: true, + Cache: cache, + DisableAutomaticCompactions: true, + TableCache: pebble.NewTableCache(cache, 20, 100), + MemTableSize: uint64(100 << 20), // 100mb memtable to avoid too many compactions + MaxConcurrentCompactions: func() int { return 3 }, }) if err != nil { return nil, fmt.Errorf("error opening storage %s: %w", path, err) @@ -300,6 +301,7 @@ func (r *DiskTail[T]) cleaner(ctx context.Context) error { // oldest timestamp based on now - max age oldest := r.config.Clk.Now().Add(-r.config.MaxAge) + log.Printf("cleaning disk tail, removing entries older than %v", oldest) // delete the whole range, starting from the minimal offsetkey if err := r.store.DeleteRange(endOffsetRange, encodeKey(oldest, 0, 0), pebble.NoSync); err != nil { log.Printf("error deleting range: %v", err)