Skip to content

Commit

Permalink
Use condition variable in NIOThreadPool (#2964)
Browse files Browse the repository at this point in the history
Motivation:

The thread performance checker warns that a threads with QoS are waiting
on threads in the NIOThreadPool. We can avoid these warnings by using a
condition variable instead.

Modifications:

- Replace the usage of a semaphore and lock in NIOThreadPool with a
condition lock.
- The condition value indicates whether the threads have work to do
(where work is processing work items or exiting their run loop).

Result:

- Fewer warnings
- Resolves #2960
  • Loading branch information
glbrntt authored Nov 19, 2024
1 parent ae87ce5 commit af0e612
Showing 1 changed file with 141 additions and 46 deletions.
187 changes: 141 additions & 46 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,22 @@ public final class NIOThreadPool {
/// It should never be "leaked" outside of the lock block.
case modifying
}
private let semaphore = DispatchSemaphore(value: 0)
private let lock = NIOLock()
private var threads: [NIOThread]? = nil // protected by `lock`

/// Whether threads in the pool have work.
private enum WorkState: Hashable {
case hasWork
case hasNoWork
}

// The condition lock is used in place of a lock and a semaphore to avoid warnings from the
// thread performance checker.
//
// Only the worker threads wait for the condition lock to take a value, no other threads need
// to wait for a given value. The value indicates whether the thread has some work to do. Work
// in this case can be either processing a work item or exiting the threads processing
// loop (i.e. shutting down).
private let conditionLock: ConditionLock<WorkState>
private var threads: [NIOThread]? = nil // protected by `conditionLock`
private var state: State = .stopped

// WorkItems don't have a handle so they can't be cancelled directly. Instead an ID is assigned
Expand Down Expand Up @@ -124,7 +137,7 @@ public final class NIOThreadPool {
return
}

let threadsToJoin = self.lock.withLock { () -> [NIOThread] in
let threadsToJoin = self.conditionLock.withLock {
switch self.state {
case .running(let items):
self.state = .modifying
Expand All @@ -133,17 +146,17 @@ public final class NIOThreadPool {
item.workItem(.cancelled)
}
}
self.state = .shuttingDown(Array(repeating: true, count: numberOfThreads))
for _ in (0..<numberOfThreads) {
self.semaphore.signal()
}
self.state = .shuttingDown(Array(repeating: true, count: self.numberOfThreads))

let threads = self.threads!
defer {
self.threads = nil
}
return threads
self.threads = nil

// Each thread has work to do: shutdown.
return (unlockWith: .hasWork, result: threads)

case .shuttingDown, .stopped:
return []
return (unlockWith: nil, result: [])

case .modifying:
fatalError(".modifying state misuse")
}
Expand Down Expand Up @@ -171,22 +184,33 @@ public final class NIOThreadPool {
}

private func _submit(id: Int?, _ body: @escaping WorkItem) {
let item = self.lock.withLock { () -> WorkItem? in
let submitted = self.conditionLock.withLock {
let workState: WorkState
let submitted: Bool

switch self.state {
case .running(var items):
self.state = .modifying
items.append(.init(workItem: body, id: id))
self.state = .running(items)
self.semaphore.signal()
return nil
workState = items.isEmpty ? .hasNoWork : .hasWork
submitted = true

case .shuttingDown, .stopped:
return body
workState = .hasNoWork
submitted = false

case .modifying:
fatalError(".modifying state misuse")
}

return (unlockWith: workState, result: submitted)
}

// if item couldn't be added run it immediately indicating that it couldn't be run
item.map { $0(.cancelled) }
if !submitted {
body(.cancelled)
}
}

/// Initialize a `NIOThreadPool` thread pool with `numberOfThreads` threads.
Expand All @@ -209,17 +233,18 @@ public final class NIOThreadPool {
private init(numberOfThreads: Int, canBeStopped: Bool) {
self.numberOfThreads = numberOfThreads
self.canBeStopped = canBeStopped
self.conditionLock = ConditionLock(value: .hasNoWork)
}

private func process(identifier: Int) {
var itemAndState: (item: WorkItem, state: WorkItemState)? = nil

repeat {
// wait until work has become available
itemAndState = nil // ensure previous work item is not retained for duration of semaphore wait
self.semaphore.wait()
itemAndState = nil // ensure previous work item is not retained while waiting for the condition
itemAndState = self.conditionLock.withLock(when: .hasWork) {
let workState: WorkState
let result: (WorkItem, WorkItemState)?

itemAndState = self.lock.withLock { () -> (WorkItem, WorkItemState)? in
switch self.state {
case .running(var items):
self.state = .modifying
Expand All @@ -233,18 +258,32 @@ public final class NIOThreadPool {
}

self.state = .running(items)
return (itemAndID.workItem, state)

workState = items.isEmpty ? .hasNoWork : .hasWork
result = (itemAndID.workItem, state)

case .shuttingDown(var aliveStates):
self.state = .modifying
assert(aliveStates[identifier])
aliveStates[identifier] = false
self.state = .shuttingDown(aliveStates)
return nil

// Unlock with '.hasWork' to resume any other threads waiting to shutdown.
workState = .hasWork
result = nil

case .stopped:
return nil
// Unreachable: 'stopped' is the initial state which is left when starting the
// thread pool, and before any thread calls this function.
fatalError("Invalid state")

case .modifying:
fatalError(".modifying state misuse")
}

return (unlockWith: workState, result: result)
}

// if there was a work item popped, run it
itemAndState.map { item, state in item(state) }
} while itemAndState != nil
Expand All @@ -256,16 +295,24 @@ public final class NIOThreadPool {
}

public func _start(threadNamePrefix: String) {
let alreadyRunning: Bool = self.lock.withLock {
let alreadyRunning = self.conditionLock.withLock {
switch self.state {
case .running(_):
return true
case .shuttingDown(_):
case .running:
// Already running, this has no effect on whether there is more work for the
// threads to run.
return (unlockWith: nil, result: true)

case .shuttingDown:
// This should never happen
fatalError("start() called while in shuttingDown")

case .stopped:
self.state = .running(Deque(minimumCapacity: 16))
return false
assert(self.threads == nil)
self.threads = []
self.threads!.reserveCapacity(self.numberOfThreads)
return (unlockWith: .hasNoWork, result: false)

case .modifying:
fatalError(".modifying state misuse")
}
Expand All @@ -278,31 +325,50 @@ public final class NIOThreadPool {
// We use this condition lock as a tricky kind of semaphore.
// This is done to sidestep the thread performance checker warning
// that would otherwise be emitted.
let cond = ConditionLock(value: 0)

self.lock.withLock {
assert(self.threads == nil)
self.threads = []
self.threads?.reserveCapacity(self.numberOfThreads)
}

let readyThreads = ConditionLock(value: 0)
for id in 0..<self.numberOfThreads {
// We should keep thread names under 16 characters because Linux doesn't allow more.
NIOThread.spawnAndRun(name: "\(threadNamePrefix)\(id)", detachThread: false) { thread in
self.lock.withLock {
self.threads!.append(thread)
cond.lock()
cond.unlock(withValue: self.threads!.count)
readyThreads.withLock {
let threadCount = self.conditionLock.withLock {
self.threads!.append(thread)
let workState: WorkState

switch self.state {
case .running(let items):
workState = items.isEmpty ? .hasNoWork : .hasWork
case .shuttingDown:
// The thread has work to do: it's shutting down.
workState = .hasWork
case .stopped:
// Unreachable: .stopped always transitions to .running in the function
// and .stopped is never entered again.
fatalError("Invalid state")
case .modifying:
fatalError(".modifying state misuse")
}

let threadCount = self.threads!.count
return (unlockWith: workState, result: threadCount)
}

return (unlockWith: threadCount, result: ())
}

self.process(identifier: id)
return ()
}
}

cond.lock(whenValue: self.numberOfThreads)
cond.unlock()
assert(self.lock.withLock { self.threads?.count ?? -1 } == self.numberOfThreads)
readyThreads.lock(whenValue: self.numberOfThreads)
readyThreads.unlock()

func threadCount() -> Int {
self.conditionLock.withLock {
(unlockWith: nil, result: self.threads?.count ?? -1)
}
}
assert(threadCount() == self.numberOfThreads)
}

deinit {
Expand Down Expand Up @@ -374,8 +440,9 @@ extension NIOThreadPool {
}
}
} onCancel: {
self.lock.withLockVoid {
self.conditionLock.withLock {
self.cancelledWorkIDs.insert(workID)
return (unlockWith: nil, result: ())
}
}
}
Expand Down Expand Up @@ -427,3 +494,31 @@ extension NIOThreadPool {
}
}
}

extension ConditionLock {
@inlinable
func _lock(when value: T?) {
if let value = value {
self.lock(whenValue: value)
} else {
self.lock()
}
}

@inlinable
func _unlock(with value: T?) {
if let value = value {
self.unlock(withValue: value)
} else {
self.unlock()
}
}

@inlinable
func withLock<Result>(when value: T? = nil, _ body: () -> (unlockWith: T?, result: Result)) -> Result {
self._lock(when: value)
let (unlockValue, result) = body()
self._unlock(with: unlockValue)
return result
}
}

0 comments on commit af0e612

Please sign in to comment.