diff --git a/nice/nice.go b/nice/nice.go index 84acc7b..14cfe98 100644 --- a/nice/nice.go +++ b/nice/nice.go @@ -1,6 +1,7 @@ package nice import ( + "container/heap" "context" "runtime" "sync" @@ -34,9 +35,11 @@ func WithMaxConcurrency(maxConcurrency int) Option { func NewScheduler(opts ...Option) *Scheduler { s := &Scheduler{ maxConcurrency: runtime.GOMAXPROCS(0), - entries: queue.New[entry](), + entries: new(queue.Q[entry]), } + heap.Init(s.entries) + for _, opt := range opts { opt(s) } @@ -53,10 +56,10 @@ func (s *Scheduler) assessEntries() { return } - entry, has := s.entries.Pop() - if !has { + if s.entries.Len() == 0 { return } + entry := heap.Pop(s.entries).(*queue.Item[entry]).Value() select { case <-entry.cancelChan: @@ -74,13 +77,12 @@ func (s *Scheduler) WaitContext(ctx context.Context, priority int) error { cancelChan := make(chan struct{}) entry := entry{ - priority: priority, waitChan: waitChan, cancelChan: cancelChan, } s.lock.Lock() - s.entries.Push(entry.priority, entry) + heap.Push(s.entries, queue.NewItem(priority, entry)) s.lock.Unlock() go func() { diff --git a/queue/queue.go b/queue/queue.go index f5baa30..2f8977b 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -1,74 +1,48 @@ package queue -type Node[T any] struct { +type Item[T any] struct { value T priority int - - next *Node[T] - prev *Node[T] + index int } -type Q[T any] struct { - head *Node[T] - tail *Node[T] +func NewItem[T any](priority int, value T) *Item[T] { + return &Item[T]{value: value, priority: priority} } -func New[T any]() *Q[T] { - return &Q[T]{} +func (item *Item[T]) Value() T { + return item.value } -func (q *Q[T]) Push(priority int, value T) { - node := &Node[T]{ - value: value, - priority: priority, - } - - if q.head == nil { - q.head = node - q.tail = node - return - } - - if priority < q.head.priority { - node.next = q.head - q.head.prev = node - q.head = node - return - } - - current := q.head - - for current.next != nil && priority >= current.next.priority { - current = current.next - } +type Q[T any] []*Item[T] - node.next = current.next - node.prev = current - current.next = node - - if node.next == nil { - q.tail = node - } else { - node.next.prev = node - } +func (pq Q[T]) Len() int { + return len(pq) } -func (q *Q[T]) Pop() (T, bool) { - if q.head == nil { - var zero T - return zero, false - } - - value := q.head.value +func (pq Q[T]) Less(i, j int) bool { + return pq[i].priority < pq[j].priority +} - if q.head == q.tail { - q.head = nil - q.tail = nil - return value, true - } +func (pq Q[T]) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} - q.head = q.head.next - q.head.prev = nil +func (pq *Q[T]) Push(x any) { + n := len(*pq) + item := x.(*Item[T]) + item.index = n + *pq = append(*pq, item) +} - return value, true +func (pq *Q[T]) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *pq = old[0 : n-1] + return item } diff --git a/queue/queue_test.go b/queue/queue_test.go index 542898c..a792301 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -1,6 +1,7 @@ package queue_test import ( + "container/heap" "testing" "github.com/aertje/gonice/queue" @@ -8,18 +9,22 @@ import ( ) func TestOrder(t *testing.T) { - q := queue.New[string]() + q := new(queue.Q[string]) - q.Push(2, "b") - q.Push(1, "a") - q.Push(3, "c") + heap.Init(q) + heap.Push(q, queue.NewItem(2, "b")) + heap.Push(q, queue.NewItem(3, "c")) + heap.Push(q, queue.NewItem(1, "a")) + heap.Push(q, queue.NewItem(2, "b")) + heap.Push(q, queue.NewItem(1, "a")) + heap.Push(q, queue.NewItem(3, "c")) - for _, want := range []string{"a", "b", "c"} { - val, has := q.Pop() - assert.True(t, has) - assert.Equal(t, want, val) + assert.Equal(t, 6, q.Len()) + + for _, want := range []string{"a", "a", "b", "b", "c", "c"} { + item := heap.Pop(q).(*queue.Item[string]) + assert.Equal(t, want, item.Value()) } - _, has := q.Pop() - assert.False(t, has) + assert.Equal(t, 0, q.Len()) }