diff --git a/cache.go b/cache.go index 1131ddf3..efe2986c 100644 --- a/cache.go +++ b/cache.go @@ -101,7 +101,7 @@ func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) { val.setExpireAt(sxat) } a.store.Set(key+cmd, val) - flight.set(val, nil) + flight.setVal(val) entries[cmd] = nil } a.mu.Unlock() @@ -112,7 +112,7 @@ func (a *adapter) Cancel(key, cmd string, err error) { a.mu.Lock() entries := a.flights[key] if flight, ok := entries[cmd].(*adapterEntry); ok { - flight.set(RedisMessage{}, err) + flight.setErr(err) entries[cmd] = nil } a.mu.Unlock() @@ -154,7 +154,7 @@ func (a *adapter) Close(err error) { for _, entries := range flights { for _, e := range entries { if e != nil { - e.(*adapterEntry).set(RedisMessage{}, err) + e.(*adapterEntry).setErr(err) } } } @@ -167,8 +167,13 @@ type adapterEntry struct { xat int64 } -func (a *adapterEntry) set(val RedisMessage, err error) { - a.err, a.val = err, val +func (a *adapterEntry) setVal(val RedisMessage) { + a.val = val + close(a.ch) +} + +func (a *adapterEntry) setErr(err error) { + a.err = err close(a.ch) } @@ -202,27 +207,27 @@ type flatentry struct { func (f *flatentry) insert(e *flatentry) { f.size += e.size f.mu.Lock() - defer f.mu.Unlock() e.ovfl = f.ovfl f.ovfl = e + f.mu.Unlock() } func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) { - if f == nil { - return - } - if ts >= f.ttl { - expired = true - return - } - if cmd == f.cmd { - _ = ret.CacheUnmarshalView(f.val) - return + for next := f; next != nil; { + if ts >= next.ttl { + expired = true + return + } + if cmd == next.cmd { + _ = ret.CacheUnmarshalView(next.val) + return + } + next.mu.RLock() + ovfl := next.ovfl + next.mu.RUnlock() + next = ovfl } - f.mu.RLock() - ovfl := f.ovfl - f.mu.RUnlock() - return ovfl.find(cmd, ts) + return } const lrBatchSize = 64 @@ -246,7 +251,9 @@ func NewFlattenCache(limit int) CacheStore { f.lrup = sync.Pool{New: func() any { b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} runtime.SetFinalizer(b, func(b *lrBatch) { + f.mu.Lock() f.llTailBatch(b) + f.mu.Unlock() }) return b }} @@ -287,11 +294,9 @@ func (f *flatten) llTail(e *flatentry) { } func (f *flatten) llTailBatch(b *lrBatch) { - f.mu.Lock() for e := range b.m { f.llTail(e) } - f.mu.Unlock() clear(b.m) } @@ -310,7 +315,9 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red batch := f.lrup.Get().(*lrBatch) batch.m[e] = struct{}{} if len(batch.m) == lrBatchSize { + f.mu.Lock() f.llTailBatch(batch) + f.mu.Unlock() } f.lrup.Put(batch) return v, nil @@ -373,7 +380,7 @@ func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { } } f.mu.Unlock() - af.set(val, nil) + af.setVal(val) } return sxat } @@ -384,7 +391,7 @@ func (f *flatten) Cancel(key, cmd string, err error) { defer f.mu.Unlock() if af := f.flights[fk]; af != nil { delete(f.flights, fk) - af.set(RedisMessage{}, err) + af.setErr(err) } } @@ -416,6 +423,6 @@ func (f *flatten) Close(err error) { f.mark++ f.mu.Unlock() for _, entry := range flights { - entry.set(RedisMessage{}, err) + entry.setErr(err) } }