Skip to content

Commit

Permalink
fix(CancellationSource): prevent priority inversion of submitted tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Mar 20, 2023
1 parent 93aacfb commit 8fa6219
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ public extension CancellationSource {
/// will ensure newly created cancellation source receive cancellation event.
///
/// - Parameters:
/// - priority: The minimum priority of task that this source is going to handle.
/// By default, minimum priority of provided `sources` is used or
/// `.background` if no provided `sources`.
/// - sources: The cancellation sources the newly created object will be linked to.
/// - file: The file link request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
Expand All @@ -14,13 +17,19 @@ public extension CancellationSource {
/// explicitly as it defaults to `#line`).
///
/// - Returns: The newly created cancellation source.
///
/// - NOTE: `CancellationSource` uses `Task`'s `result` and `value` APIs
/// to wait for completion which has side effect of increasing `Task`'s priority.
/// Hence, provide the least priority for the submitted tasks to use in cancellation task.
init(
priority: TaskPriority? = nil,
linkedWith sources: [CancellationSource],
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
self.init()
let priority = priority ?? sources.map(\.priority).min() ?? .background
self.init(priority: priority)
sources.forEach {
$0.register(task: self, file: file, function: function, line: line)
}
Expand All @@ -32,6 +41,9 @@ public extension CancellationSource {
/// will ensure newly created cancellation source receive cancellation event.
///
/// - Parameters:
/// - priority: The minimum priority of task that this source is going to handle.
/// By default, minimum priority of provided `sources` is used or
/// `.background` if no provided `sources`.
/// - sources: The cancellation sources the newly created object will be linked to.
/// - file: The file link request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
Expand All @@ -41,14 +53,19 @@ public extension CancellationSource {
/// explicitly as it defaults to `#line`).
///
/// - Returns: The newly created cancellation source.
///
/// - NOTE: `CancellationSource` uses `Task`'s `result` and `value` APIs
/// to wait for completion which has side effect of increasing `Task`'s priority.
/// Hence, provide the least priority for the submitted tasks to use in cancellation task.
init(
priority: TaskPriority? = nil,
linkedWith sources: CancellationSource...,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
self.init(
linkedWith: sources,
priority: priority, linkedWith: sources,
file: file, function: function, line: line
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ public extension CancellationSource {
/// and triggers cancellation event on this object after specified timeout.
///
/// - Parameters:
/// - priority: The minimum priority of task that this source is going to handle.
/// By default, priority is `.background`.
/// - nanoseconds: The delay after which cancellation event triggered.
/// - file: The file cancel request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
Expand All @@ -12,13 +14,18 @@ public extension CancellationSource {
/// explicitly as it defaults to `#line`).
///
/// - Returns: The newly created cancellation source.
///
/// - NOTE: `CancellationSource` uses `Task`'s `result` and `value` APIs
/// to wait for completion which has side effect of increasing `Task`'s priority.
/// Hence, provide the least priority for the submitted tasks to use in cancellation task.
init(
priority: TaskPriority = .background,
cancelAfterNanoseconds nanoseconds: UInt64,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
self.init()
self.init(priority: priority)
self.cancel(
afterNanoseconds: nanoseconds,
file: file, function: function, line: line
Expand Down Expand Up @@ -88,6 +95,8 @@ public extension CancellationSource {
/// and triggers cancellation event on this object at specified deadline.
///
/// - Parameters:
/// - priority: The minimum priority of task that this source is going to handle.
/// By default, priority is `.background`.
/// - deadline: The instant in the provided clock at which cancellation event triggered.
/// - clock: The clock for which cancellation deadline provided.
/// - file: The file cancel request originates from (there's usually no need to pass it
Expand All @@ -98,14 +107,19 @@ public extension CancellationSource {
/// explicitly as it defaults to `#line`).
///
/// - Returns: The newly created cancellation source.
///
/// - NOTE: `CancellationSource` uses `Task`'s `result` and `value` APIs
/// to wait for completion which has side effect of increasing `Task`'s priority.
/// Hence, provide the least priority for the submitted tasks to use in cancellation task.
init<C: Clock>(
priority: TaskPriority = .background,
at deadline: C.Instant,
clock: C,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
self.init()
self.init(priority: priority)
self.cancel(
at: deadline, clock: clock,
file: file, function: function, line: line
Expand Down
34 changes: 27 additions & 7 deletions Sources/AsyncObjects/CancellationSource/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import Foundation

/// An object that controls cooperative cancellation of multiple registered tasks and linked object registered tasks.
///
/// You can register tasks for cancellation using the ``register(task:file:function:line:)`` method
/// and link with additional sources by creating object with ``init(linkedWith:)`` method.
/// You can register tasks for cancellation using the ``register(task:file:function:line:)`` method and link with
/// additional sources by creating object with ``init(priority:linkedWith:file:function:line:)-7b9gf`` method.
/// By calling the ``cancel(file:function:line:)`` method all the registered tasks will be cancelled
/// and the cancellation event will be propagated to linked cancellation sources,
/// which in turn cancels their registered tasks and further propagates cancellation.
Expand Down Expand Up @@ -42,10 +42,12 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
/// The lifetime task that is cancelled when
/// `CancellationSource` is cancelled.
@usableFromInline
var lifetime: Task<Void, Error>!
let lifetime: Task<Void, Error>!
/// The stream continuation used to register work items
/// for cooperative cancellation.
var pipe: AsyncStream<WorkItem>.Continuation!
let pipe: AsyncStream<WorkItem>.Continuation!
/// The priority of the detached cancellation task.
let priority: TaskPriority

/// A Boolean value that indicates whether cancellation is already
/// invoked on the source.
Expand All @@ -59,10 +61,21 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {

/// Creates a new cancellation source object.
///
/// - Parameters:
/// - priority: The minimum priority of task that this source is going to handle.
/// By default, priority is `.background`.
///
/// - Returns: The newly created cancellation source.
public init() {
let stream = AsyncStream<WorkItem> { self.pipe = $0 }
self.lifetime = Task.detached {
///
/// - NOTE: `CancellationSource` uses `Task`'s `result` and `value` APIs
/// to wait for completion which has side effect of increasing `Task`'s priority.
/// Hence, provide the least priority for the submitted tasks to use in cancellation task.
public init(priority: TaskPriority = .background) {
var continuation: AsyncStream<WorkItem>.Continuation!
let stream = AsyncStream<WorkItem> { continuation = $0 }
self.priority = priority
self.pipe = continuation
self.lifetime = Task.detached(priority: priority) {
try await withThrowingTaskGroup(of: Void.self) { group in
for await item in stream {
group.addTask {
Expand Down Expand Up @@ -94,6 +107,10 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
/// pass it explicitly as it defaults to `#function`).
/// - line: The line work registration originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
///
/// - Important: Do not use this method to link `CancellationSource` as it might introduce
/// circular linking which will cause all the affected cancellation tasks to leak.
/// Use ``init(priority:linkedWith:file:function:line:)-7b9gf`` instead.
@Sendable
public func register<C: Cancellable>(
task: C,
Expand Down Expand Up @@ -158,6 +175,9 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
/// pass it explicitly as it defaults to `#function`).
/// - line: The line wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
///
/// - Important: Using this method might introduce circular linking of`CancellationSource`
/// which will cause all the affected cancellation tasks to leak.
@Sendable
public func wait(
file: String = #fileID,
Expand Down

0 comments on commit 8fa6219

Please sign in to comment.