Skip to content

Commit

Permalink
queue: split Dequeue() method into _dequeueSuspended and _dequeueActive
Browse files Browse the repository at this point in the history
Split the Dequeue() operation into _dequeueSuspended and _dequeueActive.
These function encapsulate decreasing the related metric counter.
Previously decrementing the metric had to be done in multiple places instead.
Encapsulating it makes it easier to maintain.

Both functions expect that the lock is hold while they are called.
  • Loading branch information
fho committed Jul 21, 2021
1 parent 70f6965 commit 2f78215
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions internal/autoupdate/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,40 +218,58 @@ func (q *queue) Enqueue(pr *PullRequest) error {
return q._enqueueActive(pr)
}

func (q *queue) _dequeueSuspended(prNumber int) (*PullRequest, error) {
pr, exist := q.suspended[prNumber]

if !exist {
return nil, ErrNotFound
}

delete(q.suspended, prNumber)
q.metrics.SuspendQueueSizeDec()

return pr, nil
}

func (q *queue) _dequeueActive(prNumber int) (removedPR *PullRequest, newFirstPr *PullRequest) {
pr, newFirstElem := q.active.Dequeue(prNumber)
if pr != nil {
q.metrics.ActiveQueueSizeDec()
}

return pr, newFirstElem
}

// Dequeue removes the pull request with the given number from the active or
// suspended list.
// If an update operation is currently running for it, it is canceled.
// If the pull request does not exist in the queue, ErrNotFound is returned.
func (q *queue) Dequeue(prNumber int) (*PullRequest, error) {
q.lock.Lock()

if pr, exist := q.suspended[prNumber]; exist {
logger := q.logger.With(pr.LogFields...)

delete(q.suspended, prNumber)
if pr, err := q._dequeueSuspended(prNumber); err == nil {
q.lock.Unlock()

logger := q.logger.With(pr.LogFields...)
logger.Debug(
"pull request removed from suspend queue",
logfields.Event("pull_request_dequeued"),
)

pr.SetStateUnchangedSince(time.Time{})

q.metrics.SuspendQueueSizeDec()

return pr, nil
} else if errors.Is(err, ErrNotFound) {
q.logger.DPanic("_dequeue_suspended returned unexpected error", zap.Error(err))
}

removed, newFirstElem := q.active.Dequeue(prNumber)
removed, newFirstElem := q._dequeueActive(prNumber)
q.lock.Unlock()

if removed == nil {
return nil, ErrNotFound
}

q.metrics.ActiveQueueSizeDec()

q.cancelActionForPR(prNumber)
removed.SetStateUnchangedSince(time.Time{})

Expand Down Expand Up @@ -280,13 +298,11 @@ func (q *queue) Suspend(prNumber int) error {
q.lock.Lock()
defer q.lock.Unlock()

pr, newFirstElem := q.active.Dequeue(prNumber)
pr, newFirstElem := q._dequeueActive(prNumber)
if pr == nil {
return fmt.Errorf("pr not in active queue: %w", ErrNotFound)
}

q.metrics.ActiveQueueSizeDec()

if _, exist := q.suspended[prNumber]; exist {
q.logger.DPanic("pr was in active and suspend queue, removed it from active queue")
return nil
Expand Down Expand Up @@ -333,12 +349,11 @@ func (q *queue) ResumeAll() {
continue
}

delete(q.suspended, prNum)
_, _ = q._dequeueSuspended(prNum)
logger.Info(
"autoupdates for pr resumed",
logfields.Event("pull_request_updates_resumed"),
)
q.metrics.SuspendQueueSizeDec()
}
}

Expand All @@ -347,18 +362,13 @@ func (q *queue) ResumeAll() {
// If the pull request is the only active pull request, the update operation is run for it.
func (q *queue) Resume(prNumber int) error {
q.lock.Lock()
pr, exist := q.suspended[prNumber]
if exist {
delete(q.suspended, prNumber)
}
pr, err := q._dequeueSuspended(prNumber)
q.lock.Unlock()

if !exist {
return ErrNotFound
if err != nil {
return err
}

q.metrics.SuspendQueueSizeDec()

logger := q.logger.With(pr.LogFields...)

if err := q.Enqueue(pr); err != nil {
Expand All @@ -370,8 +380,6 @@ func (q *queue) Resume(prNumber int) error {
return fmt.Errorf("enqueing previously suspended pr failed: %w", err)
}

q.metrics.ActiveQueueSizeInc()

return nil
}

Expand Down

0 comments on commit 2f78215

Please sign in to comment.