Skip to content

Commit

Permalink
pythongh-128002: use per threads tasks linked list in asyncio (python…
Browse files Browse the repository at this point in the history
…#128869)

Co-authored-by: Łukasz Langa <[email protected]>
  • Loading branch information
kumaraditya303 and ambv authored Feb 6, 2025
1 parent b4ff8b2 commit 0d68b14
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 58 deletions.
7 changes: 7 additions & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ struct _is {
PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
_PyIndexPool tlbc_indices;
#endif
// Per-interpreter list of tasks, any lingering tasks from thread
// states gets added here and removed from the corresponding
// thread state's list.
struct llist_node asyncio_tasks_head;
// `asyncio_tasks_lock` is used when tasks are moved
// from thread's list to interpreter's list.
PyMutex asyncio_tasks_lock;

// Per-interpreter state for the obmalloc allocator. For the main
// interpreter and for all interpreters that don't have their
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ typedef enum _PyLockFlags {

// Lock a mutex with an optional timeout and additional options. See
// _PyLockFlags for details.
extern PyLockStatus
extern PyAPI_FUNC(PyLockStatus)
_PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags);

// Lock a mutex with additional options. See _PyLockFlags for details.
Expand Down
4 changes: 2 additions & 2 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime);
// Perform a stop-the-world pause for threads in the specified interpreter.
//
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
extern void _PyEval_StopTheWorld(PyInterpreterState *interp);
extern void _PyEval_StartTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp);


static inline void
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ typedef struct _PyThreadStateImpl {
PyObject *asyncio_running_loop; // Strong reference
PyObject *asyncio_running_task; // Strong reference

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it used in `asyncio.all_tasks`.
*/
struct llist_node asyncio_tasks_head;
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue


#ifdef Py_GIL_DISABLED
struct _gc_thread_state gc;
struct _mimalloc_thread_state mimalloc;
Expand Down
19 changes: 18 additions & 1 deletion Lib/test/test_asyncio/test_free_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import unittest
from threading import Thread
from unittest import TestCase

import weakref
from test import support
from test.support import threading_helper

threading_helper.requires_working_threading(module=True)
Expand Down Expand Up @@ -95,6 +96,22 @@ def check():
done.set()
runner.join()

def test_task_different_thread_finalized(self) -> None:
task = None
async def func():
nonlocal task
task = asyncio.current_task()

thread = Thread(target=lambda: asyncio.run(func()))
thread.start()
thread.join()
wr = weakref.ref(task)
del thread
del task
# task finalization in different thread shouldn't crash
support.gc_collect()
self.assertIsNone(wr())

def test_run_coroutine_threadsafe(self) -> None:
results = []

Expand Down
165 changes: 112 additions & 53 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ typedef struct TaskObj {
PyObject *task_name;
PyObject *task_context;
struct llist_node task_node;
#ifdef Py_GIL_DISABLED
// thread id of the thread where this task was created
uintptr_t task_tid;
#endif
} TaskObj;

typedef struct {
Expand Down Expand Up @@ -94,14 +98,6 @@ typedef struct {
|| PyObject_TypeCheck(obj, state->FutureType) \
|| PyObject_TypeCheck(obj, state->TaskType))

#ifdef Py_GIL_DISABLED
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
#else
# define ASYNCIO_STATE_LOCK(state) ((void)state)
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
#endif

typedef struct _Py_AsyncioModuleDebugOffsets {
struct _asyncio_task_object {
uint64_t size;
Expand Down Expand Up @@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)

/* State of the _asyncio module */
typedef struct {
#ifdef Py_GIL_DISABLED
PyMutex mutex;
#endif
PyTypeObject *FutureIterType;
PyTypeObject *TaskStepMethWrapper_Type;
PyTypeObject *FutureType;
Expand Down Expand Up @@ -184,11 +177,6 @@ typedef struct {
/* Counter for autogenerated Task names */
uint64_t task_name_counter;

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it. Third party tasks implementations which don't inherit from
`asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
*/
struct llist_node asyncio_tasks_head;
} asyncio_state;

static inline asyncio_state *
Expand Down Expand Up @@ -2179,16 +2167,15 @@ static PyMethodDef TaskWakeupDef = {
static void
register_task(asyncio_state *state, TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next != NULL) {
// already registered
assert(task->task_node.prev != NULL);
goto exit;
return;
}
llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
struct llist_node *head = &tstate->asyncio_tasks_head;
llist_insert_tail(head, &task->task_node);
}

static int
Expand All @@ -2197,19 +2184,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
return PySet_Add(state->eager_tasks, task);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
static inline void
unregister_task_safe(TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next == NULL) {
// not registered
assert(task->task_node.prev == NULL);
goto exit;
return;
}
llist_remove(&task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
{
assert(Task_Check(state, task));
#ifdef Py_GIL_DISABLED
// check if we are in the same thread
// if so, we can avoid locking
if (task->task_tid == _Py_ThreadId()) {
unregister_task_safe(task);
}
else {
// we are in a different thread
// stop the world then check and remove the task
PyThreadState *tstate = _PyThreadState_GET();
_PyEval_StopTheWorld(tstate->interp);
unregister_task_safe(task);
_PyEval_StartTheWorld(tstate->interp);
}
#else
unregister_task_safe(task);
#endif
}

static int
Expand Down Expand Up @@ -2423,6 +2429,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
}

Py_CLEAR(self->task_fut_waiter);
#ifdef Py_GIL_DISABLED
self->task_tid = _Py_ThreadId();
#endif
self->task_must_cancel = 0;
self->task_log_destroy_pending = 1;
self->task_num_cancels_requested = 0;
Expand Down Expand Up @@ -3981,6 +3990,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
static inline int
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
{
assert(PySet_CheckExact(tasks));
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
if (done == NULL) {
return -1;
Expand All @@ -4003,6 +4013,57 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
return 0;
}

static inline int
add_tasks_llist(struct llist_node *head, PyListObject *tasks)
{
struct llist_node *node;
llist_for_each_safe(node, head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) {
// do not call any escaping calls here while the world is stopped.
return -1;
}
}
}
return 0;
}

static inline int
add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
{
#ifdef Py_GIL_DISABLED
assert(interp->stoptheworld.world_stopped);
#endif
// Start traversing from interpreter's linked list
struct llist_node *head = &interp->asyncio_tasks_head;

if (add_tasks_llist(head, tasks) < 0) {
return -1;
}

int ret = 0;
// traverse the task lists of thread states
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)p;
head = &ts->asyncio_tasks_head;
if (add_tasks_llist(head, tasks) < 0) {
ret = -1;
goto exit;
}
}
exit:
_Py_FOR_EACH_TSTATE_END(interp);
return ret;
}

/*********************** Module **************************/

/*[clinic input]
Expand Down Expand Up @@ -4041,30 +4102,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
Py_DECREF(loop);
return NULL;
}
int err = 0;
ASYNCIO_STATE_LOCK(state);
struct llist_node *node;

llist_for_each_safe(node, &state->asyncio_tasks_head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
err = 1;
break;
}
}
}
ASYNCIO_STATE_UNLOCK(state);
if (err) {
PyInterpreterState *interp = PyInterpreterState_Get();
// Stop the world and traverse the per-thread linked list
// of asyncio tasks for every thread, as well as the
// interpreter's linked list, and add them to `tasks`.
// The interpreter linked list is used for any lingering tasks
// whose thread state has been deallocated while the task was
// still alive. This can happen if a task is referenced by
// a different thread, in which case the task is moved to
// the interpreter's linked list from the thread's linked
// list before deallocation. See PyThreadState_Clear.
//
// The stop-the-world pause is required so that no thread
// modifies its linked list while being iterated here
// in parallel. This design allows for lock-free
// register_task/unregister_task for loops running in parallel
// in different threads (the general case).
_PyEval_StopTheWorld(interp);
int ret = add_tasks_interp(interp, (PyListObject *)tasks);
_PyEval_StartTheWorld(interp);
if (ret < 0) {
// call any escaping calls after starting the world to avoid any deadlocks.
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
Expand Down Expand Up @@ -4348,7 +4408,6 @@ module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);

llist_init(&state->asyncio_tasks_head);

#define CREATE_TYPE(m, tp, spec, base) \
do { \
Expand Down
12 changes: 11 additions & 1 deletion Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ init_interpreter(PyInterpreterState *interp,
_Py_brc_init_state(interp);
#endif
llist_init(&interp->mem_free_queue.head);
llist_init(&interp->asyncio_tasks_head);
interp->asyncio_tasks_lock = (PyMutex){0};
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
interp->monitors.tools[i] = 0;
}
Expand Down Expand Up @@ -1512,7 +1514,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
tstate->delete_later = NULL;

llist_init(&_tstate->mem_free_queue);

llist_init(&_tstate->asyncio_tasks_head);
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
// Start in the suspended state if there is an ongoing stop-the-world.
tstate->state = _Py_THREAD_SUSPENDED;
Expand Down Expand Up @@ -1692,6 +1694,14 @@ PyThreadState_Clear(PyThreadState *tstate)
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task);


PyMutex_Lock(&tstate->interp->asyncio_tasks_lock);
// merge any lingering tasks from thread state to interpreter's
// tasks list
llist_concat(&tstate->interp->asyncio_tasks_head,
&((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
PyMutex_Unlock(&tstate->interp->asyncio_tasks_lock);

Py_CLEAR(tstate->dict);
Py_CLEAR(tstate->async_exc);

Expand Down

0 comments on commit 0d68b14

Please sign in to comment.