forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcondition.jl
190 lines (160 loc) · 7.24 KB
/
condition.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# This file is a part of Julia. License is MIT: https://julialang.org/license
## thread/task locking abstraction
@noinline function concurrency_violation()
# can be useful for debugging
#try; error(); catch; ccall(:jlbacktrace, Cvoid, ()); end
throw(ConcurrencyViolationError("lock must be held"))
end
"""
AbstractLock
Abstract supertype describing types that
implement the synchronization primitives:
[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref).
"""
abstract type AbstractLock end
function lock end
function unlock end
function trylock end
function islocked end
unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wait`
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
assert_havelock(l::AbstractLock, tid::Integer) =
(islocked(l) && tid == Threads.threadid()) ? nothing : concurrency_violation()
assert_havelock(l::AbstractLock, tid::Task) =
(islocked(l) && tid === current_task()) ? nothing : concurrency_violation()
assert_havelock(l::AbstractLock, tid::Nothing) = concurrency_violation()
"""
AlwaysLockedST
This struct does not implement a real lock, but instead
pretends to be always locked on the original thread it was allocated on,
and simply ignores all other interactions.
It also does not synchronize tasks; for that use a real lock such as [`ReentrantLock`](@ref).
This can be used in the place of a real lock to, instead, simply and cheaply assert
that the operation is only occurring on a single cooperatively-scheduled thread.
It is thus functionally equivalent to allocating a real, recursive, task-unaware lock
immediately calling `lock` on it, and then never calling a matching `unlock`,
except that calling `lock` from another thread will throw a concurrency violation exception.
"""
struct AlwaysLockedST <: AbstractLock
ownertid::Int16
AlwaysLockedST() = new(Threads.threadid())
end
assert_havelock(l::AlwaysLockedST) = assert_havelock(l, l.ownertid)
lock(l::AlwaysLockedST) = assert_havelock(l)
unlock(l::AlwaysLockedST) = assert_havelock(l)
trylock(l::AlwaysLockedST) = l.ownertid == Threads.threadid()
islocked(::AlwaysLockedST) = true
## condition variables
"""
GenericCondition
Abstract implementation of a condition object
for synchronizing tasks objects with a given lock.
"""
struct GenericCondition{L<:AbstractLock}
waitq::IntrusiveLinkedList{Task}
lock::L
GenericCondition{L}() where {L<:AbstractLock} = new{L}(IntrusiveLinkedList{Task}(), L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(IntrusiveLinkedList{Task}(), l)
GenericCondition(l::AbstractLock) = new{typeof(l)}(IntrusiveLinkedList{Task}(), l)
end
assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
lock(c::GenericCondition) = lock(c.lock)
unlock(c::GenericCondition) = unlock(c.lock)
trylock(c::GenericCondition) = trylock(c.lock)
islocked(c::GenericCondition) = islocked(c.lock)
lock(f, c::GenericCondition) = lock(f, c.lock)
# have waiter wait for c
function _wait2(c::GenericCondition, waiter::Task, first::Bool=false)
ct = current_task()
assert_havelock(c)
if first
pushfirst!(c.waitq, waiter)
else
push!(c.waitq, waiter)
end
# since _wait2 is similar to schedule, we should observe the sticky bit now
if waiter.sticky && Threads.threadid(waiter) == 0
# Issue #41324
# t.sticky && tid == 0 is a task that needs to be co-scheduled with
# the parent task. If the parent (current_task) is not sticky we must
# set it to be sticky.
# XXX: Ideally we would be able to unset this
ct.sticky = true
tid = Threads.threadid()
ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1)
end
return
end
"""
wait([x])
Block the current task until some event occurs, depending on the type of the argument:
* [`Channel`](@ref): Wait for a value to be appended to the channel.
* [`Condition`](@ref): Wait for [`notify`](@ref) on a condition and return the `val`
parameter passed to `notify`. Waiting on a condition additionally allows passing
`first=true` which results in the waiter being put _first_ in line to wake up on `notify`
instead of the usual first-in-first-out behavior.
* `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process
can be used to determine success or failure.
* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, a
`TaskFailedException` (which wraps the failed task) is thrown.
* [`RawFD`](@ref): Wait for changes on a file descriptor (see the `FileWatching` package).
If no argument is passed, the task blocks for an undefined period. A task can only be
restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
proceeding.
"""
function wait(c::GenericCondition; first::Bool=false)
ct = current_task()
_wait2(c, ct, first)
token = unlockall(c.lock)
try
return wait()
catch
ct.queue === nothing || list_deletefirst!(ct.queue, ct)
rethrow()
finally
relockall(c.lock, token)
end
end
"""
notify(condition, val=nothing; all=true, error=false)
Wake up tasks waiting for a condition, passing them `val`. If `all` is `true` (the default),
all waiting tasks are woken, otherwise only one is. If `error` is `true`, the passed value
is raised as an exception in the woken tasks.
Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`.
"""
@constprop :none notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
while !isempty(c.waitq)
t = popfirst!(c.waitq)
schedule(t, arg, error=error)
cnt += 1
all || break
end
return cnt
end
notify_error(c::GenericCondition, err) = notify(c, err, true, true)
"""
isempty(condition)
Return `true` if no tasks are waiting on the condition, `false` otherwise.
"""
isempty(c::GenericCondition) = isempty(c.waitq)
# default (Julia v1.0) is currently single-threaded
# (although it uses MT-safe versions, when possible)
"""
Condition()
Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
called can be woken up. For level-triggered notifications, you must keep extra state to keep
track of whether a notification has happened. The [`Channel`](@ref) and [`Threads.Event`](@ref) types do
this, and can be used for level-triggered events.
This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version.
"""
const Condition = GenericCondition{AlwaysLockedST}
lock(c::GenericCondition{AlwaysLockedST}) =
throw(ArgumentError("`Condition` is not thread-safe. Please use `Threads.Condition` instead for multi-threaded code."))
unlock(c::GenericCondition{AlwaysLockedST}) =
throw(ArgumentError("`Condition` is not thread-safe. Please use `Threads.Condition` instead for multi-threaded code."))