Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use condition variable in NIOThreadPool #2964

Merged
merged 7 commits into from
Nov 19, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 134 additions & 46 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ public final class NIOThreadPool {
/// It should never be "leaked" outside of the lock block.
case modifying
}
private let semaphore = DispatchSemaphore(value: 0)
private let lock = NIOLock()
private var threads: [NIOThread]? = nil // protected by `lock`

/// Whether threads in the pool have work.
private enum WorkState: Hashable {
case hasWork
case hasNoWork
}

private let conditionLock: ConditionLock<WorkState>
private var threads: [NIOThread]? = nil // protected by `conditionLock`
private var state: State = .stopped

// WorkItems don't have a handle so they can't be cancelled directly. Instead an ID is assigned
Expand Down Expand Up @@ -124,7 +130,7 @@ public final class NIOThreadPool {
return
}

let threadsToJoin = self.lock.withLock { () -> [NIOThread] in
let threadsToJoin = self.conditionLock.withLock {
switch self.state {
case .running(let items):
self.state = .modifying
Expand All @@ -133,17 +139,17 @@ public final class NIOThreadPool {
item.workItem(.cancelled)
}
}
self.state = .shuttingDown(Array(repeating: true, count: numberOfThreads))
for _ in (0..<numberOfThreads) {
self.semaphore.signal()
}
self.state = .shuttingDown(Array(repeating: true, count: self.numberOfThreads))

let threads = self.threads!
defer {
self.threads = nil
}
return threads
self.threads = nil

// Each thread has work to do: shutdown.
return (unlockWith: .hasWork, result: threads)

case .shuttingDown, .stopped:
return []
return (unlockWith: nil, result: [])

case .modifying:
fatalError(".modifying state misuse")
}
Expand Down Expand Up @@ -171,22 +177,33 @@ public final class NIOThreadPool {
}

private func _submit(id: Int?, _ body: @escaping WorkItem) {
let item = self.lock.withLock { () -> WorkItem? in
let submitted = self.conditionLock.withLock {
let workState: WorkState
let submitted: Bool

switch self.state {
case .running(var items):
self.state = .modifying
items.append(.init(workItem: body, id: id))
self.state = .running(items)
self.semaphore.signal()
return nil
workState = items.isEmpty ? .hasNoWork : .hasWork
submitted = true

case .shuttingDown, .stopped:
return body
workState = .hasWork
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here. It might be useful to write out the strategy for the lock value in a comment above it, which will make it easier to audit.

submitted = false

case .modifying:
fatalError(".modifying state misuse")
}

return (unlockWith: workState, result: submitted)
}

// if item couldn't be added run it immediately indicating that it couldn't be run
item.map { $0(.cancelled) }
if !submitted {
body(.cancelled)
}
}

/// Initialize a `NIOThreadPool` thread pool with `numberOfThreads` threads.
Expand All @@ -209,17 +226,18 @@ public final class NIOThreadPool {
private init(numberOfThreads: Int, canBeStopped: Bool) {
self.numberOfThreads = numberOfThreads
self.canBeStopped = canBeStopped
self.conditionLock = ConditionLock(value: .hasNoWork)
}

private func process(identifier: Int) {
var itemAndState: (item: WorkItem, state: WorkItemState)? = nil

repeat {
// wait until work has become available
itemAndState = nil // ensure previous work item is not retained for duration of semaphore wait
self.semaphore.wait()
itemAndState = nil // ensure previous work item is not retained while waiting for the condition
itemAndState = self.conditionLock.withLock(when: .hasWork) {
let workState: WorkState
let result: (WorkItem, WorkItemState)?

itemAndState = self.lock.withLock { () -> (WorkItem, WorkItemState)? in
switch self.state {
case .running(var items):
self.state = .modifying
Expand All @@ -233,18 +251,32 @@ public final class NIOThreadPool {
}

self.state = .running(items)
return (itemAndID.workItem, state)

workState = items.isEmpty ? .hasNoWork : .hasWork
result = (itemAndID.workItem, state)

case .shuttingDown(var aliveStates):
self.state = .modifying
assert(aliveStates[identifier])
aliveStates[identifier] = false
self.state = .shuttingDown(aliveStates)
return nil

// Unlock with '.hasWork' to resume any other threads waiting to shutdown.
workState = .hasWork
result = nil

case .stopped:
return nil
// Unreachable: 'stopped' is the initial state which is left when starting the
// thread pool, and before any thread calls this function.
fatalError("Invalid state")

case .modifying:
fatalError(".modifying state misuse")
}

return (unlockWith: workState, result: result)
}

// if there was a work item popped, run it
itemAndState.map { item, state in item(state) }
} while itemAndState != nil
Expand All @@ -256,16 +288,24 @@ public final class NIOThreadPool {
}

public func _start(threadNamePrefix: String) {
let alreadyRunning: Bool = self.lock.withLock {
let alreadyRunning = self.conditionLock.withLock {
switch self.state {
case .running(_):
return true
case .shuttingDown(_):
case .running:
// Already running, this has no effect on whether there is more work for the
// threads to run.
return (unlockWith: nil, result: true)

case .shuttingDown:
// This should never happen
fatalError("start() called while in shuttingDown")

case .stopped:
self.state = .running(Deque(minimumCapacity: 16))
return false
assert(self.threads == nil)
self.threads = []
self.threads!.reserveCapacity(self.numberOfThreads)
return (unlockWith: .hasNoWork, result: false)

case .modifying:
fatalError(".modifying state misuse")
}
Expand All @@ -278,31 +318,50 @@ public final class NIOThreadPool {
// We use this condition lock as a tricky kind of semaphore.
// This is done to sidestep the thread performance checker warning
// that would otherwise be emitted.
let cond = ConditionLock(value: 0)

self.lock.withLock {
assert(self.threads == nil)
self.threads = []
self.threads?.reserveCapacity(self.numberOfThreads)
}

let readyThreads = ConditionLock(value: 0)
for id in 0..<self.numberOfThreads {
// We should keep thread names under 16 characters because Linux doesn't allow more.
NIOThread.spawnAndRun(name: "\(threadNamePrefix)\(id)", detachThread: false) { thread in
self.lock.withLock {
self.threads!.append(thread)
cond.lock()
cond.unlock(withValue: self.threads!.count)
readyThreads.withLock {
let threadCount = self.conditionLock.withLock {
self.threads!.append(thread)
let workState: WorkState

switch self.state {
case .running(let items):
workState = items.isEmpty ? .hasNoWork : .hasWork
case .shuttingDown:
// The thread has work to do: it's shutting down.
workState = .hasWork
case .stopped:
// Unreachable: .stopped always transitions to .running in the function
// and .stopped is never entered again.
fatalError("Invalid state")
case .modifying:
fatalError(".modifying state misuse")
}

let threadCount = self.threads!.count
return (unlockWith: workState, result: threadCount)
}

return (unlockWith: threadCount, result: ())
}

self.process(identifier: id)
return ()
}
}

cond.lock(whenValue: self.numberOfThreads)
cond.unlock()
assert(self.lock.withLock { self.threads?.count ?? -1 } == self.numberOfThreads)
readyThreads.lock(whenValue: self.numberOfThreads)
readyThreads.unlock()

func threadCount() -> Int {
self.conditionLock.withLock {
(unlockWith: nil, result: self.threads?.count ?? -1)
}
}
assert(threadCount() == self.numberOfThreads)
}

deinit {
Expand Down Expand Up @@ -374,8 +433,9 @@ extension NIOThreadPool {
}
}
} onCancel: {
self.lock.withLockVoid {
self.conditionLock.withLock {
self.cancelledWorkIDs.insert(workID)
return (unlockWith: nil, result: ())
}
}
}
Expand Down Expand Up @@ -427,3 +487,31 @@ extension NIOThreadPool {
}
}
}

extension ConditionLock {
@inlinable
func _lock(when value: T?) {
if let value = value {
self.lock(whenValue: value)
} else {
self.lock()
}
}

@inlinable
func _unlock(with value: T?) {
if let value = value {
self.unlock(withValue: value)
} else {
self.unlock()
}
}

@inlinable
func withLock<Result>(when value: T? = nil, _ body: () -> (unlockWith: T?, result: Result)) -> Result {
self._lock(when: value)
let (unlockValue, result) = body()
self._unlock(with: unlockValue)
return result
}
}
Loading