-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use condition variable in NIOThreadPool #2964
Merged
+141
−46
Merged
Changes from 3 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
43f2de3
Use condition variable in NIOThreadPool
glbrntt dcb480e
Appease the formatting gods
glbrntt af71d8b
Merge branch 'main' into nio-thread-pool-warnings
glbrntt fd4677f
Update Sources/NIOPosix/NIOThreadPool.swift
glbrntt 0dbbd34
clarify strategy for condition lock value
glbrntt a8ec301
Merge branch 'main' into nio-thread-pool-warnings
glbrntt b014071
Merge branch 'main' into nio-thread-pool-warnings
glbrntt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,9 +79,15 @@ public final class NIOThreadPool { | |
/// It should never be "leaked" outside of the lock block. | ||
case modifying | ||
} | ||
private let semaphore = DispatchSemaphore(value: 0) | ||
private let lock = NIOLock() | ||
private var threads: [NIOThread]? = nil // protected by `lock` | ||
|
||
/// Whether threads in the pool have work. | ||
private enum WorkState: Hashable { | ||
case hasWork | ||
case hasNoWork | ||
} | ||
|
||
private let conditionLock: ConditionLock<WorkState> | ||
private var threads: [NIOThread]? = nil // protected by `conditionLock` | ||
private var state: State = .stopped | ||
|
||
// WorkItems don't have a handle so they can't be cancelled directly. Instead an ID is assigned | ||
|
@@ -124,7 +130,7 @@ public final class NIOThreadPool { | |
return | ||
} | ||
|
||
let threadsToJoin = self.lock.withLock { () -> [NIOThread] in | ||
let threadsToJoin = self.conditionLock.withLock { | ||
switch self.state { | ||
case .running(let items): | ||
self.state = .modifying | ||
|
@@ -133,17 +139,17 @@ public final class NIOThreadPool { | |
item.workItem(.cancelled) | ||
} | ||
} | ||
self.state = .shuttingDown(Array(repeating: true, count: numberOfThreads)) | ||
for _ in (0..<numberOfThreads) { | ||
self.semaphore.signal() | ||
} | ||
self.state = .shuttingDown(Array(repeating: true, count: self.numberOfThreads)) | ||
|
||
let threads = self.threads! | ||
defer { | ||
self.threads = nil | ||
} | ||
return threads | ||
self.threads = nil | ||
|
||
// Each thread has work to do: shutdown. | ||
return (unlockWith: .hasWork, result: threads) | ||
|
||
case .shuttingDown, .stopped: | ||
return [] | ||
return (unlockWith: .hasWork, result: []) | ||
glbrntt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
case .modifying: | ||
fatalError(".modifying state misuse") | ||
} | ||
|
@@ -171,22 +177,33 @@ public final class NIOThreadPool { | |
} | ||
|
||
private func _submit(id: Int?, _ body: @escaping WorkItem) { | ||
let item = self.lock.withLock { () -> WorkItem? in | ||
let submitted = self.conditionLock.withLock { | ||
let workState: WorkState | ||
let submitted: Bool | ||
|
||
switch self.state { | ||
case .running(var items): | ||
self.state = .modifying | ||
items.append(.init(workItem: body, id: id)) | ||
self.state = .running(items) | ||
self.semaphore.signal() | ||
return nil | ||
workState = items.isEmpty ? .hasNoWork : .hasWork | ||
submitted = true | ||
|
||
case .shuttingDown, .stopped: | ||
return body | ||
workState = .hasWork | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question here. It might be useful to write out the strategy for the lock value in a comment above it, which will make it easier to audit. |
||
submitted = false | ||
|
||
case .modifying: | ||
fatalError(".modifying state misuse") | ||
} | ||
|
||
return (unlockWith: workState, result: submitted) | ||
} | ||
|
||
// if item couldn't be added run it immediately indicating that it couldn't be run | ||
item.map { $0(.cancelled) } | ||
if !submitted { | ||
body(.cancelled) | ||
} | ||
} | ||
|
||
/// Initialize a `NIOThreadPool` thread pool with `numberOfThreads` threads. | ||
|
@@ -209,17 +226,18 @@ public final class NIOThreadPool { | |
private init(numberOfThreads: Int, canBeStopped: Bool) { | ||
self.numberOfThreads = numberOfThreads | ||
self.canBeStopped = canBeStopped | ||
self.conditionLock = ConditionLock(value: .hasNoWork) | ||
} | ||
|
||
private func process(identifier: Int) { | ||
var itemAndState: (item: WorkItem, state: WorkItemState)? = nil | ||
|
||
repeat { | ||
// wait until work has become available | ||
itemAndState = nil // ensure previous work item is not retained for duration of semaphore wait | ||
self.semaphore.wait() | ||
itemAndState = nil // ensure previous work item is not retained while waiting for the condition | ||
itemAndState = self.conditionLock.withLock(when: .hasWork) { | ||
let workState: WorkState | ||
let result: (WorkItem, WorkItemState)? | ||
|
||
itemAndState = self.lock.withLock { () -> (WorkItem, WorkItemState)? in | ||
switch self.state { | ||
case .running(var items): | ||
self.state = .modifying | ||
|
@@ -233,18 +251,32 @@ public final class NIOThreadPool { | |
} | ||
|
||
self.state = .running(items) | ||
return (itemAndID.workItem, state) | ||
|
||
workState = items.isEmpty ? .hasNoWork : .hasWork | ||
result = (itemAndID.workItem, state) | ||
|
||
case .shuttingDown(var aliveStates): | ||
self.state = .modifying | ||
assert(aliveStates[identifier]) | ||
aliveStates[identifier] = false | ||
self.state = .shuttingDown(aliveStates) | ||
return nil | ||
|
||
// Unlock with '.hasWork' to resume any other threads waiting to shutdown. | ||
workState = .hasWork | ||
result = nil | ||
|
||
case .stopped: | ||
return nil | ||
// Unreachable: 'stopped' is the initial state which is left when starting the | ||
// thread pool, and before any thread calls this function. | ||
fatalError("Invalid state") | ||
|
||
case .modifying: | ||
fatalError(".modifying state misuse") | ||
} | ||
|
||
return (unlockWith: workState, result: result) | ||
} | ||
|
||
// if there was a work item popped, run it | ||
itemAndState.map { item, state in item(state) } | ||
} while itemAndState != nil | ||
|
@@ -256,16 +288,24 @@ public final class NIOThreadPool { | |
} | ||
|
||
public func _start(threadNamePrefix: String) { | ||
let alreadyRunning: Bool = self.lock.withLock { | ||
let alreadyRunning = self.conditionLock.withLock { | ||
switch self.state { | ||
case .running(_): | ||
return true | ||
case .shuttingDown(_): | ||
case .running: | ||
// Already running, this has no effect on whether there is more work for the | ||
// threads to run. | ||
return (unlockWith: nil, result: true) | ||
|
||
case .shuttingDown: | ||
// This should never happen | ||
fatalError("start() called while in shuttingDown") | ||
|
||
case .stopped: | ||
self.state = .running(Deque(minimumCapacity: 16)) | ||
return false | ||
assert(self.threads == nil) | ||
self.threads = [] | ||
self.threads!.reserveCapacity(self.numberOfThreads) | ||
return (unlockWith: .hasNoWork, result: false) | ||
|
||
case .modifying: | ||
fatalError(".modifying state misuse") | ||
} | ||
|
@@ -278,31 +318,50 @@ public final class NIOThreadPool { | |
// We use this condition lock as a tricky kind of semaphore. | ||
// This is done to sidestep the thread performance checker warning | ||
// that would otherwise be emitted. | ||
let cond = ConditionLock(value: 0) | ||
|
||
self.lock.withLock { | ||
assert(self.threads == nil) | ||
self.threads = [] | ||
self.threads?.reserveCapacity(self.numberOfThreads) | ||
} | ||
|
||
let readyThreads = ConditionLock(value: 0) | ||
for id in 0..<self.numberOfThreads { | ||
// We should keep thread names under 16 characters because Linux doesn't allow more. | ||
NIOThread.spawnAndRun(name: "\(threadNamePrefix)\(id)", detachThread: false) { thread in | ||
self.lock.withLock { | ||
self.threads!.append(thread) | ||
cond.lock() | ||
cond.unlock(withValue: self.threads!.count) | ||
readyThreads.withLock { | ||
let threadCount = self.conditionLock.withLock { | ||
self.threads!.append(thread) | ||
let workState: WorkState | ||
|
||
switch self.state { | ||
case .running(let items): | ||
workState = items.isEmpty ? .hasNoWork : .hasWork | ||
case .shuttingDown: | ||
// The thread has work to do: it's shutting down. | ||
workState = .hasWork | ||
case .stopped: | ||
// Unreachable: .stopped always transitions to .running in the function | ||
// and .stopped is never entered again. | ||
fatalError("Invalid state") | ||
case .modifying: | ||
fatalError(".modifying state misuse") | ||
} | ||
|
||
let threadCount = self.threads!.count | ||
return (unlockWith: workState, result: threadCount) | ||
} | ||
|
||
return (unlockWith: threadCount, result: ()) | ||
} | ||
|
||
self.process(identifier: id) | ||
return () | ||
} | ||
} | ||
|
||
cond.lock(whenValue: self.numberOfThreads) | ||
cond.unlock() | ||
assert(self.lock.withLock { self.threads?.count ?? -1 } == self.numberOfThreads) | ||
readyThreads.lock(whenValue: self.numberOfThreads) | ||
readyThreads.unlock() | ||
|
||
func threadCount() -> Int { | ||
self.conditionLock.withLock { | ||
(unlockWith: nil, result: self.threads?.count ?? -1) | ||
} | ||
} | ||
assert(threadCount() == self.numberOfThreads) | ||
} | ||
|
||
deinit { | ||
|
@@ -374,8 +433,9 @@ extension NIOThreadPool { | |
} | ||
} | ||
} onCancel: { | ||
self.lock.withLockVoid { | ||
self.conditionLock.withLock { | ||
self.cancelledWorkIDs.insert(workID) | ||
return (unlockWith: nil, result: ()) | ||
} | ||
} | ||
} | ||
|
@@ -427,3 +487,31 @@ extension NIOThreadPool { | |
} | ||
} | ||
} | ||
|
||
extension ConditionLock { | ||
@inlinable | ||
func _lock(when value: T?) { | ||
if let value = value { | ||
self.lock(whenValue: value) | ||
} else { | ||
self.lock() | ||
} | ||
} | ||
|
||
@inlinable | ||
func _unlock(with value: T?) { | ||
if let value = value { | ||
self.unlock(withValue: value) | ||
} else { | ||
self.unlock() | ||
} | ||
} | ||
|
||
@inlinable | ||
func withLock<Result>(when value: T? = nil, _ body: () -> (unlockWith: T?, result: Result)) -> Result { | ||
self._lock(when: value) | ||
let (unlockValue, result) = body() | ||
self._unlock(with: unlockValue) | ||
return result | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this state right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I don't think we need to unlock with a value here, if we're already shutting down then we've already signalled there's work for the threads to do, we don't need to do that again.