下文将从各个层面逐层深入,详细了解线程启动的具体实现逻辑。
threading 层面通过 start() 方法启动线程。线程对象内维护 _started 事件用于判断当前线程是否已经启动,这是一个多线程共享的变量。父线程在 start() 方法的最后阶段会调用 _started.wait(),若子线程尚未启动,则父线程会进入阻塞状态等待子线程启动完成。_initialized 状态用于记录线程对象是否正常创建和初始化。
另外,threading 模块还负责管理全局的线程对象。进入 start() 后,父线程会将子线程对象先存入 _limbo 全局变量中,表示待启动的线程。然后等待操作系统为子线程分配资源,然后子线程将自己加入到 _active 全局变量中,并从 _limbo 中移除。_limbo 和 _active 都属于全局共享变量,所有的写操作都需要拿到 _active_limbo_lock 互斥锁。
_thread 模块中的 _start_new_thread 实现了子线程启动的具体逻辑,其第一个参数为子线程的执行函数,最顶层的函数定义在 Thread._bootstrap(self) 中,具体由 Thread._bootstrap_inner(self) 实现。这层封装采用模板方法模式实现,主要负责完成线程管理、用户代码接入和异常处理。
线程管理即根据线程所处状态维护 threading 模块中的 _limbo 和 _active 全局变量。用户代码接入即在子线程中调用 target 函数或覆盖后的 Thread.run() 方法。异常处理则依据 excepthook 的配置来输出或处理未捕获的异常,默认由 sys.excepthook 实现。
sequenceDiagram
participant User as 父线程
participant Thread as Thread 对象
participant TModule as _thread 模块
participant Bootstrap as 子线程
User->>Thread: t = Thread(target=func, args=(1,2))
Note over Thread: self._target = func<br/>self._args = (1,2)<br/>self._kwargs = {}<br/>self._started = Event()<br/>self._initialized = True<br/>self._invoke_excepthook = _make_invoke_excepthook()
rect rgb(245, 245, 254)
Note over User: 启动子线程
User->>Thread: t.start()
activate Thread
Thread->>Thread: assert self._initialized
Thread->>Thread: _limbo[self] = self
Note over Thread: _limbo: 全局待启动线程字典<br/>临时存储等待确认的线程
Thread->>TModule: _thread.start_new_thread(self._bootstrap, ())
TModule-->>Thread: thread_id
Thread->>Thread: self._started.wait()
Note over Thread: Event.wait() 阻塞等待新线程确认启动
end
TModule->>Bootstrap: 新线程执行 _bootstrap()
activate Bootstrap
Bootstrap->>Thread: self._started.set()
Note over Thread: Event.set() 唤醒 start() 解除阻塞
deactivate Thread
Bootstrap->>Bootstrap: _active[self._ident] = self<br/>del _limbo[self]
Note over Bootstrap: _active: 全局活跃线程字典<br/>存储正在运行的线程
rect rgb(240, 255, 240)
Note over Bootstrap: try 执行用户代码
Bootstrap->>Bootstrap: try:
Bootstrap->>Bootstrap: self.run()
Bootstrap->>User: self._target(*self._args, **self._kwargs)
Note over User: 执行用户函数 func(1, 2)
end
rect rgb(255, 240, 240)
Bootstrap->>Bootstrap: except SystemExit:
Note over Bootstrap: 若为 SystemExit 则静默退出
Bootstrap->>Bootstrap: except:
Bootstrap->>Bootstrap: self._invoke_excepthook(self)
Note over Bootstrap: 调用异常钩子处理未捕获异常
Bootstrap->>Bootstrap: finally:
Bootstrap->>Bootstrap: del _active[self._ident]
Note over Bootstrap: 从活跃字典移除,清理线程信息并退出
end
deactivate Bootstrap
上述顺序图对应的具体代码实现如下,细节中包括上述未提及的 daemon 继承、异常回调配置 _stderr 和 _invoke_excepthook 等。此外,_limbo 是线程对象到线程对象的映射;而 _active 是 ident(操作系统分配的线程标识符)到线程对象的映射。ident 值由子线程在 _bootstrap_inner 中调用 self._set_ident() 获取并设置到 self._ident 变量。_bootstrap_inner 除上述三个功能外,还实现了 trace 钩子的配置,以支持 pdb 等调试功能。
import _thread
_start_new_thread = _thread.start_new_thread
# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {} # maps thread id to Thread object
_limbo = {}
class Thread:
_initialized = False
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
self._target = target
self._name = str(name or _newname())
self._args = args
self._kwargs = kwargs
if daemon is not None:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
self._ident = None
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
self._tstate_lock = None
self._started = Event()
self._is_stopped = False
self._initialized = True
# Copy of sys.stderr used by self._invoke_excepthook()
self._stderr = _sys.stderr
self._invoke_excepthook = _make_invoke_excepthook()
# For debugging and _after_fork()
_dangling.add(self)
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
def _bootstrap(self):
# Wrapper around the real bootstrap code that ignores
# exceptions during interpreter cleanup. Those typically
# happen when a daemon thread wakes up at an unfortunate
# moment, finds the world around it destroyed, and raises some
# random exception *** while trying to report the exception in
# _bootstrap_inner() below ***. Those random exceptions
# don't help anybody, and they confuse users, so we suppress
# them. We suppress them only when it appears that the world
# indeed has already been destroyed, so that exceptions in
# _bootstrap_inner() during normal business hours are properly
# reported. Also, we only suppress them for daemonic threads;
# if a non-daemonic encounters this, something else is wrong.
try:
self._bootstrap_inner()
except:
if self._daemonic and _sys is None:
return
raise
def _bootstrap_inner(self):
try:
self._set_ident()
self._set_tstate_lock()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
self._started.set()
with _active_limbo_lock:
_active[self._ident] = self
del _limbo[self]
if _trace_hook:
_sys.settrace(_trace_hook)
if _profile_hook:
_sys.setprofile(_profile_hook)
try:
self.run()
except:
self._invoke_excepthook(self)
finally:
with _active_limbo_lock:
try:
# We don't call self._delete() because it also
# grabs _active_limbo_lock.
del _active[get_ident()]
except:
pass从 Thread.start(self) 可以了解到,其本质是调用 _thread.start_new_thread 接口启动子线程,具体实现位于 Modules/_threadmodule.c 文件中。start_new_thread 的实现对应于 thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) 函数。
thread_PyThread_start_new_thread 会进一步将参数封装到 struct bootstate 结构中,以提供线程代码执行所需的环境,即解释器状态和线程状态。然后将此结构作为参数传递给 t_bootstrap 函数,并调用 thread_pthread 中的 PyThread_start_new_thread 接口创建线程,最后返回线程标识符。
sequenceDiagram
participant Py as threading 模块
participant TNT as thread_PyThread_start_new_thread 函数
participant PyState as pystate.c
participant PThread as thread_pthread.c
Py->>TNT: _thread.start_new_thread(func, args, kwargs)
activate TNT
rect rgb(245, 255, 255)
Note over TNT: 参数检查
TNT->>TNT: PyCallable_Check(func)<br/>PyTuple_Check(args)<br/>PyDict_Check(kwargs)
end
rect rgb(240, 255, 240)
Note over TNT: 分配和初始化 bootstate
TNT->>TNT: boot = PyMem_NEW(struct bootstate, 1)
TNT->>PyState: boot->interp = _PyInterpreterState_Get()
Note over PyState: 获取父线程解释器状态
TNT->>PyState: boot->tstate = _PyThreadState_Prealloc(boot->interp)
Note over PyState: 预分配子线程状态
TNT->>TNT: boot->func = func<br/>boot->args = args<br/>boot->keyw = kwargs
end
rect rgb(240, 240, 255)
Note over TNT: 引用计数与 GIL
TNT->>TNT: Py_INCREF(func)<br/>Py_INCREF(args)<br/>Py_XINCREF(kwargs)
Note over TNT: 防止对象被垃圾回收
TNT->>TNT: PyEval_InitThreads()
Note over TNT: 确保 GIL 已创建
end
rect rgb(255, 255, 240)
Note over TNT,PThread: 创建 OS 线程
TNT->>PThread: PyThread_start_new_thread(t_bootstrap, (void*) boot)
PThread-->>TNT: ident (unsigned long)
end
TNT-->>Py: PyLong_FromUnsignedLong(ident)
deactivate TNT
具体实现如下,除模块及相关数据结构的定义外,算法流程与上述顺序图逐一对应。
// Modules/_threadmodules.c
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
// 省略余下代码
}
static struct PyModuleDef threadmodule = {
PyModuleDef_HEAD_INIT,
"_thread",
thread_doc,
-1,
thread_methods,
NULL,
NULL,
NULL,
NULL
};
struct bootstate {
PyInterpreterState *interp;
PyObject *func;
PyObject *args;
PyObject *keyw;
PyThreadState *tstate;
};
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
// 参数解包和校验
PyObject *func, *args, *keyw = NULL;
struct bootstate *boot;
unsigned long ident;
// 从 tuple fargs 中解包 2 到 3 个参数,分别赋值到 func, args, keyw
// "start_new_thread" 仅用于错误信息输出
if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
&func, &args, &keyw))
return NULL;
if (!PyCallable_Check(func)) {
PyErr_SetString(PyExc_TypeError,
"first arg must be callable");
return NULL;
}
if (!PyTuple_Check(args)) {
PyErr_SetString(PyExc_TypeError,
"2nd arg must be a tuple");
return NULL;
}
if (keyw != NULL && !PyDict_Check(keyw)) {
PyErr_SetString(PyExc_TypeError,
"optional 3rd arg must be a dictionary");
return NULL;
}
// 为 bootstate 分配内存
boot = PyMem_NEW(struct bootstate, 1);
if (boot == NULL)
return PyErr_NoMemory();
// 为子线程设置 Python 解释器环境(继承自父线程)
boot->interp = _PyInterpreterState_Get();
boot->func = func;
boot->args = args;
boot->keyw = keyw;
// 初始化子线程状态
boot->tstate = _PyThreadState_Prealloc(boot->interp);
if (boot->tstate == NULL) {
PyMem_DEL(boot);
return PyErr_NoMemory();
}
Py_INCREF(func);
Py_INCREF(args);
Py_XINCREF(keyw);
// 确保 GIL 已创建
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
if (ident == PYTHREAD_INVALID_THREAD_ID) {
PyErr_SetString(ThreadError, "can't start new thread");
Py_DECREF(func);
Py_DECREF(args);
Py_XDECREF(keyw);
PyThreadState_Clear(boot->tstate);
PyMem_DEL(boot);
return NULL;
}
return PyLong_FromUnsignedLong(ident);
}t_bootstrap 函数的执行细节如下:其主要负责获取 GIL,然后在给定的执行环境下调用 struct bootstate 中封装的函数。若出现非 SystemExit 的未捕获异常,则传播到调用层。执行结束后会清理线程对象。
sequenceDiagram
participant T1 as 父线程
participant OS as 操作系统
participant T2 as 子线程
participant TBoot as t_bootstrap 函数
participant GIL as GIL 机制
rect rgb(245, 255, 245)
Note over T1: 父线程创建子线程
T1->>OS: pthread_create(t_bootstrap, boot_raw)
OS->>T2: 创建新线程
Note over OS,T2: OS 调度器将子线程加入就绪队列
end
rect rgb(245, 245, 255)
Note over OS,T2: OS 调度子线程执行
OS->>T2: 上下文切换 (保存父线程状态,恢复子线程状态)
T2->>TBoot: t_bootstrap(boot_raw)
activate TBoot
end
rect rgb(255, 255, 245)
Note over TBoot: t_bootstrap 初始化
TBoot->>TBoot: boot = (struct bootstate *) boot_raw<br/>tstate = boot->tstate
Note over TBoot: 绑定 OS 线程 ID 到 PyThreadState
TBoot->>TBoot: tstate->thread_id = PyThread_get_thread_ident()
Note over TBoot: 初始化线程状态
TBoot->>TBoot: _PyThreadState_Init(&_PyRuntime, tstate)
end
rect rgb(255, 245, 255)
Note over TBoot,GIL: 获取 GIL
TBoot->>GIL: PyEval_AcquireThread(tstate)
GIL->>GIL: take_gil(tstate)
GIL->>GIL: MUTEX_LOCK(gil_mutex)
Note over GIL: pthread_mutex_lock(gil->mutex)
loop 等待 GIL 循环
Note over GIL: 若 GIL 被其他线程占用
GIL->>GIL: while (gil_locked == 1)
Note over GIL: 请求当前持有者释放 GIL
GIL->>GIL: SET_GIL_DROP_REQUEST()
Note over GIL: 进入等待队列,释放 CPU
GIL->>GIL: COND_TIMED_WAIT(gil_cond, 5ms)
end
Note over GIL: 标记 GIL 已锁定
GIL->>GIL: _Py_atomic_store(gil_locked, 1)
Note over GIL: 设置为当前活跃线程
GIL->>GIL: _PyThreadState_Current = tstate
GIL->>GIL: MUTEX_UNLOCK(gil_mutex)
Note over GIL: pthread_mutex_unlock()
Note over TBoot: 增加解释器线程计数
TBoot->>TBoot: tstate->interp->num_threads++
end
rect rgb(245, 255, 255)
Note over TBoot: 调用用户提供的 Python 函数
TBoot->>TBoot: res = PyObject_Call(boot->func, boot->args, boot->keyw)
alt res == NULL (发生异常)
Note over TBoot: SystemExit 异常被静默忽略
TBoot->>TBoot: if (PyErr_ExceptionMatches(PyExc_SystemExit))<br/>PyErr_Clear()
else 其他异常
Note over TBoot: 异常传播到上层
TBoot->>TBoot: _PyErr_WriteUnraisableMsg("in thread started by", boot->func)
end
end
rect rgb(255, 240, 240)
Note over TBoot: 清理引导参数
TBoot->>TBoot: Py_DECREF(boot->func)<br/>Py_DECREF(boot->args)<br/>Py_XDECREF(boot->keyw)<br/>PyMem_DEL(boot_raw)
Note over TBoot: 减少解释器线程计数
TBoot->>TBoot: tstate->interp->num_threads--
end
rect rgb(245, 245, 245)
Note over TBoot: 清理线程状态
TBoot->>TBoot: PyThreadState_Clear(tstate)
Note over TBoot: 清理线程状态对象<br/>frame, dict, curexc_*, ...
TBoot->>GIL: PyThreadState_DeleteCurrent()
Note over GIL: 从解释器链表中移除 tstate<br/>调用 drop_gil() 释放 GIL
GIL->>GIL: drop_gil()
Note over GIL: 唤醒等待的线程<br/>futex_wake(gil_cond->futex, 1)
Note over TBoot: 调用 pthread_exit(NULL)<br/>退出线程
TBoot->>TBoot: PyThread_exit_thread()
deactivate TBoot
end
对应的实现代码和依赖函数如下所示。
// Modules/_threadmodule.c
static void
t_bootstrap(void *boot_raw)
{
struct bootstate *boot = (struct bootstate *) boot_raw;
PyThreadState *tstate;
PyObject *res;
tstate = boot->tstate;
tstate->thread_id = PyThread_get_thread_ident();
_PyThreadState_Init(&_PyRuntime, tstate);
PyEval_AcquireThread(tstate);
tstate->interp->num_threads++;
res = PyObject_Call(boot->func, boot->args, boot->keyw);
if (res == NULL) {
if (PyErr_ExceptionMatches(PyExc_SystemExit))
/* SystemExit is ignored silently */
PyErr_Clear();
else {
_PyErr_WriteUnraisableMsg("in thread started by", boot->func);
}
}
else {
Py_DECREF(res);
}
Py_DECREF(boot->func);
Py_DECREF(boot->args);
Py_XDECREF(boot->keyw);
PyMem_DEL(boot_raw);
tstate->interp->num_threads--;
PyThreadState_Clear(tstate);
PyThreadState_DeleteCurrent();
PyThread_exit_thread();
}
// Python/ceval.c
void
PyEval_AcquireThread(PyThreadState *tstate)
{
if (tstate == NULL) {
Py_FatalError("PyEval_AcquireThread: NULL new thread state");
}
_PyRuntimeState *runtime = &_PyRuntime;
struct _ceval_runtime_state *ceval = &runtime->ceval;
/* Check someone has called PyEval_InitThreads() to create the lock */
assert(gil_created(&ceval->gil));
take_gil(ceval, tstate);
exit_thread_if_finalizing(runtime, tstate);
if (_PyThreadState_Swap(&runtime->gilstate, tstate) != NULL) {
Py_FatalError("PyEval_AcquireThread: non-NULL old thread state");
}
}
// Python/ceval_gil.h
static void
take_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate)
{
if (tstate == NULL) {
Py_FatalError("take_gil: NULL tstate");
}
struct _gil_runtime_state *gil = &ceval->gil;
int err = errno;
MUTEX_LOCK(gil->mutex);
if (!_Py_atomic_load_relaxed(&gil->locked)) {
goto _ready;
}
while (_Py_atomic_load_relaxed(&gil->locked)) {
int timed_out = 0;
unsigned long saved_switchnum;
saved_switchnum = gil->switch_number;
unsigned long interval = (gil->interval >= 1 ? gil->interval : 1);
COND_TIMED_WAIT(gil->cond, gil->mutex, interval, timed_out);
/* If we timed out and no switch occurred in the meantime, it is time
to ask the GIL-holding thread to drop it. */
if (timed_out &&
_Py_atomic_load_relaxed(&gil->locked) &&
gil->switch_number == saved_switchnum)
{
SET_GIL_DROP_REQUEST(ceval);
}
}
_ready:
#ifdef FORCE_SWITCHING
/* This mutex must be taken before modifying gil->last_holder:
see drop_gil(). */
MUTEX_LOCK(gil->switch_mutex);
#endif
/* We now hold the GIL */
_Py_atomic_store_relaxed(&gil->locked, 1);
_Py_ANNOTATE_RWLOCK_ACQUIRED(&gil->locked, /*is_write=*/1);
if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil->last_holder)) {
_Py_atomic_store_relaxed(&gil->last_holder, (uintptr_t)tstate);
++gil->switch_number;
}
#ifdef FORCE_SWITCHING
COND_SIGNAL(gil->switch_cond);
MUTEX_UNLOCK(gil->switch_mutex);
#endif
if (_Py_atomic_load_relaxed(&ceval->gil_drop_request)) {
RESET_GIL_DROP_REQUEST(ceval);
}
if (tstate->async_exc != NULL) {
_PyEval_SignalAsyncExc(ceval);
}
MUTEX_UNLOCK(gil->mutex);
errno = err;
}PyThread_start_new_thread 是 CPython 最底层的线程创建函数,其内部主要负责封装参数,然后调用 pthread_create 接口创建线程。
sequenceDiagram
participant Caller as _thread.thread_PyThread_start_new_thread 函数
participant PST as PyThread_start_new_thread 函数
participant OS as 操作系统(pthread 接口)
participant NewThread as 子线程
participant Wrapper as pythread_wrapper 函数
participant func as 用户函数
rect rgb(245, 255, 245)
Note over Caller,PST: 调用线程请求创建新线程
Caller->>PST: PyThread_start_new_thread(t_bootstrap, (void*) boot)
activate PST
alt !initialized
Note over PST: 首次调用,初始化线程系统
PST->>PST: PyThread_init_thread()
Note over PST: 初始化条件变量属性
end
end
rect rgb(245, 245, 255)
Note over PST: 配置线程属性(如栈大小、调度范围等)
alt THREAD_STACK_SIZE or PTHREAD_SYSTEM_SCHED_SUPPORTED
PST->>OS: pthread_attr_init(&attrs)
OS-->>PST: attrs
alt THREAD_STACK_SIZE defined
Note over PST: 获取栈大小配置
PST->>PST: tstate = _PyThreadState_GET()<br/>stacksize = tstate->interp->pythread_stacksize<br/>tss = stacksize ? stacksize : THREAD_STACK_SIZE
alt tss != 0
PST->>OS: pthread_attr_setstacksize(&attrs, tss)
Note over OS: 设置线程栈大小
alt 设置失败
PST->>OS: pthread_attr_destroy(&attrs)
PST-->>Caller: PYTHREAD_INVALID_THREAD_ID
end
end
end
alt PTHREAD_SYSTEM_SCHED_SUPPORTED defined
PST->>OS: pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM)
Note over OS: 设置系统级调度范围
end
end
end
rect rgb(255, 255, 245)
Note over PST: 准备回调数据结构
PST->>PST: callback = PyMem_RawMalloc(sizeof(pythread_callback))
alt callback == NULL
Note over PST: 内存分配失败
PST-->>Caller: PYTHREAD_INVALID_THREAD_ID
end
Note over PST: 封装用户函数和参数
PST->>PST: callback->func = func<br/>callback->arg = arg
end
rect rgb(255, 245, 255)
Note over PST,OS: 创建新线程
PST->>OS: pthread_create(&th, &attrs, pythread_wrapper, callback)
alt 创建成功
Note over OS: 操作系统创建新线程
OS->>NewThread: 线程被创建并调度
Note over OS,NewThread: OS 调度器将新线程加入就绪队列
OS-->>PST: status = 0
Note over PST: 销毁线程属性对象
alt attrs was initialized
PST->>OS: pthread_attr_destroy(&attrs)
end
Note over PST: 分离线程(自动回收资源)
PST->>OS: pthread_detach(th)
Note over OS: 线程结束后自动回收资源
Note over PST: 返回线程标识符
PST->>PST: thread_id = (unsigned long) th
PST-->>Caller: thread_id
else 创建失败
OS-->>PST: status != 0
Note over PST: 清理资源
alt attrs was initialized
PST->>OS: pthread_attr_destroy(&attrs)
end
PST->>PST: PyMem_RawFree(callback)
PST-->>Caller: PYTHREAD_INVALID_THREAD_ID
end
deactivate PST
end
rect rgb(245, 255, 255)
Note over NewThread,Wrapper: OS 调度新线程执行
OS->>NewThread: 上下文切换到新线程
NewThread->>Wrapper: pythread_wrapper(callback)
activate Wrapper
Note over Wrapper: 解包回调数据
Wrapper->>Wrapper: void (*func)(void *) = callback->func<br/>void *func_arg = callback->arg
Note over Wrapper: 释放临时回调结构
Wrapper->>Wrapper: PyMem_RawFree(callback)
end
rect rgb(255, 240, 240)
Note over Wrapper,UserFunc: 执行用户函数
Wrapper->>UserFunc: func(func_arg)
activate UserFunc
Note over UserFunc: 用户提供的函数逻辑执行
UserFunc-->>Wrapper: 函数执行完成
deactivate UserFunc
Wrapper-->>NewThread: return NULL
deactivate Wrapper
end
rect rgb(245, 245, 245)
Note over NewThread: 线程正常退出
NewThread->>OS: pthread_exit(0)
Note over OS: 操作系统回收线程资源
end
具体代码实现如下。其中 pthread 是一套跨平台的线程编程接口(POSIX Threads),由不同操作系统具体实现(如 Linux 由 NPTL 实现)。相关类型包括:pthread_t 表示线程标识符,用作 pthread_* 接口的操作参数;pthread_attr_t 定义了线程的属性对象,如栈大小和调度范围等信息,在创建新线程时作为配置参数传递。
相关函数包括:pthread_attr_init 和 pthread_attr_destroy 用于初始化和销毁 pthread_attr_t;pthread_attr_set* 系列函数用于设置相关属性;pthread_create 用于创建线程,需要接收线程标识符、线程配置、线程入口函数及其参数;pthread_detach 用于分离线程,即将线程的管理交由操作系统,线程结束后由操作系统自动回收资源。因此在 Python 中,启动线程后无需手动管理其生命周期。
// Python/thread_pthread.h
typedef struct {
void (*func) (void *);
void *arg;
} pythread_callback;
static void *
pythread_wrapper(void *arg)
{
/* copy func and func_arg and free the temporary structure */
pythread_callback *callback = arg;
void (*func)(void *) = callback->func;
void *func_arg = callback->arg;
PyMem_RawFree(arg);
func(func_arg);
return NULL;
}
unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
pthread_t th;
int status;
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_t attrs;
#endif
#if defined(THREAD_STACK_SIZE)
size_t tss;
#endif
dprintf(("PyThread_start_new_thread called\n"));
if (!initialized)
PyThread_init_thread();
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
if (pthread_attr_init(&attrs) != 0)
return PYTHREAD_INVALID_THREAD_ID;
#endif
#if defined(THREAD_STACK_SIZE)
PyThreadState *tstate = _PyThreadState_GET();
size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
tss = (stacksize != 0) ? stacksize : THREAD_STACK_SIZE;
if (tss != 0) {
if (pthread_attr_setstacksize(&attrs, tss) != 0) {
pthread_attr_destroy(&attrs);
return PYTHREAD_INVALID_THREAD_ID;
}
}
#endif
#if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
#endif
pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback));
if (callback == NULL) {
return PYTHREAD_INVALID_THREAD_ID;
}
callback->func = func;
callback->arg = arg;
status = pthread_create(&th,
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
&attrs,
#else
(pthread_attr_t*)NULL,
#endif
pythread_wrapper, callback);
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_destroy(&attrs);
#endif
if (status != 0) {
PyMem_RawFree(callback);
return PYTHREAD_INVALID_THREAD_ID;
}
pthread_detach(th);
#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
return (unsigned long) th;
#else
return (unsigned long) *(unsigned long *) &th;
#endif
}父线程通过调用子线程对象的 join 方法等待该线程执行完毕或超时。join 方法首先验证调用的合法性,然后通过尝试获取子线程的 _tstate_lock 锁。若锁已被释放,说明子线程已执行完毕;否则父线程进入阻塞状态,等待子线程结束后被唤醒,或在超时后被操作系统唤醒。
sequenceDiagram
participant Caller as 父线程
participant ThreadObj as Thread 对象
participant TStateLock as _tstate_lock
participant WorkerThread as 子线程
participant CRuntime as _thread.t_bootstrap 函数
rect rgb(245, 255, 245)
Note over Caller: 父线程开始等待子线程
Caller->>ThreadObj: t.join(timeout)
activate ThreadObj
Note over ThreadObj: 参数和状态验证
ThreadObj->>ThreadObj: if not self._initialized:<br/>raise RuntimeError("Thread.__init__() not called")
ThreadObj->>ThreadObj: if not self._started.is_set():<br/>raise RuntimeError("cannot join thread before it is started")
ThreadObj->>ThreadObj: if self is current_thread():<br/>raise RuntimeError("cannot join current thread")
end
rect rgb(245, 245, 255)
Note over ThreadObj: 确定等待策略
alt timeout is None (无限等待)
ThreadObj->>ThreadObj: _wait_for_tstate_lock()
Note over ThreadObj: block=True, timeout=-1
else timeout >= 0 (超时等待)
ThreadObj->>ThreadObj: _wait_for_tstate_lock(timeout=max(timeout, 0))
end
end
rect rgb(255, 255, 245)
Note over ThreadObj,TStateLock: 尝试获取 self._tstate_lock
ThreadObj->>TStateLock: lock = self._tstate_lock
alt lock is None (子线程已结束)
ThreadObj->>ThreadObj: assert self._is_stopped
ThreadObj-->>Caller: 立即返回
else lock exists (子线程仍在运行)
Note over TStateLock: 尝试获取锁
end
end
rect rgb(255, 245, 255)
Note over TStateLock,WorkerThread: 子线程继续执行
activate WorkerThread
WorkerThread->>WorkerThread: 执行用户代码
Note over TStateLock: _tstate_lock 被子线程持有
TStateLock->>TStateLock: lock.acquire(block, timeout)
Note over TStateLock: 父线程在此阻塞,等待锁释放
Note over Caller,TStateLock: 父线程进入睡眠,OS 调度其他线程运行
end
rect rgb(245, 255, 255)
Note over WorkerThread: 工作线程即将结束
WorkerThread->>WorkerThread: _bootstrap_inner() 执行完成
WorkerThread->>WorkerThread: finally: del _active[get_ident()]
WorkerThread->>CRuntime: 线程退出清理
deactivate WorkerThread
end
rect rgb(255, 240, 245)
Note over CRuntime: 线程清理和锁释放
CRuntime->>CRuntime: PyThreadState_Clear(tstate)
Note over CRuntime: 清理 Python 线程状态
CRuntime->>CRuntime: frame, dict, curexc_*, ...
CRuntime->>CRuntime: 从解释器状态链表移除该线程状态
Note over CRuntime: 释放 _tstate_lock
CRuntime->>TStateLock: tstate_lock.release()
Note over TStateLock: 锁被释放,唤醒等待线程
end
rect rgb(245, 255, 240)
Note over TStateLock,Caller: 父线程被唤醒
TStateLock-->>ThreadObj: lock.acquire() 返回 True
Note over ThreadObj: 成功获取锁,表示子线程已结束
ThreadObj->>TStateLock: lock.release()
ThreadObj->>ThreadObj: self._stop()
Note over ThreadObj: 标记线程已停止,清理锁引用
Note over ThreadObj: self._is_stopped = True<br/>self._tstate_lock = None
end
rect rgb(240, 255, 240)
ThreadObj-->>Caller: 返回 None
deactivate ThreadObj
Note over Caller: 子线程已结束
end
rect rgb(255, 245, 245)
Note over Caller: 超时返回场景
alt 发生超时
Note over TStateLock: lock.acquire(timeout) 返回 False
ThreadObj-->>Caller: 返回 None(子线程仍在运行)
Note over Caller: 调用 thread.is_alive() 检查
end
end
join 的实现代码如下,本质上是由互斥锁进行线程间通信的一个例子。
class Thread:
_initialized = False
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
self._tstate_lock = None
self._started = Event()
self._is_stopped = False
self._initialized = True
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if timeout is None:
self._wait_for_tstate_lock()
else:
# the behavior of a negative timeout isn't documented, but
# historically .join(timeout=x) for x<0 has acted as if timeout=0
self._wait_for_tstate_lock(timeout=max(timeout, 0))
def _wait_for_tstate_lock(self, block=True, timeout=-1):
# Issue #18808: wait for the thread state to be gone.
# At the end of the thread's life, after all knowledge of the thread
# is removed from C data structures, C code releases our _tstate_lock.
# This method passes its arguments to _tstate_lock.acquire().
# If the lock is acquired, the C code is done, and self._stop() is
# called. That sets ._is_stopped to True, and ._tstate_lock to None.
lock = self._tstate_lock
if lock is None: # already determined that the C code is done
assert self._is_stopped
elif lock.acquire(block, timeout):
lock.release()
self._stop()线程锁 _tstate_lock 区别于其它类型锁,能够在线程结束后自动被释放。首先 _tstate_lock 锁是在子线程启动时由 _thread._set_sentinel 创建,然后立即被子线程持有。
import _thread
_set_sentinel = _thread._set_sentinel
class Thread:
def _set_tstate_lock(self):
"""
Set a lock object which will be released by the interpreter when
the underlying thread state (see pystate.h) gets deleted.
"""
self._tstate_lock = _set_sentinel()
self._tstate_lock.acquire()
if not self.daemon:
with _shutdown_locks_lock:
_shutdown_locks.add(self._tstate_lock)
def _bootstrap_inner(self):
try:
# 省略余下代码
self._set_tstate_lock()
# 省略余下代码
finally:
# 省略余下代码然后在 _set_sentinel() 中,线程锁由 newlockobject() 创建,并弱绑定到子线程状态的 on_delete_data 字段,同时指定销毁时的回调函数 on_delete 为 release_sentinel。因此当线程执行结束后,t_bootstrap 函数调用 PyThreadState_Clear(tstate) 时会触发回调函数 release_sentinel,从而释放线程锁。
// Modules/_threadmodule.c
static PyObject *
thread__set_sentinel(PyObject *self, PyObject *Py_UNUSED(ignored))
{
PyObject *wr;
PyThreadState *tstate = PyThreadState_Get();
lockobject *lock;
if (tstate->on_delete_data != NULL) {
/* We must support the re-creation of the lock from a
fork()ed child. */
assert(tstate->on_delete == &release_sentinel);
wr = (PyObject *) tstate->on_delete_data;
tstate->on_delete = NULL;
tstate->on_delete_data = NULL;
Py_DECREF(wr);
}
lock = newlockobject();
if (lock == NULL)
return NULL;
/* The lock is owned by whoever called _set_sentinel(), but the weakref
hangs to the thread state. */
wr = PyWeakref_NewRef((PyObject *) lock, NULL);
if (wr == NULL) {
Py_DECREF(lock);
return NULL;
}
tstate->on_delete_data = (void *) wr;
tstate->on_delete = &release_sentinel;
return (PyObject *) lock;
}release_sentinel 的实现如下,其内部通过调用 PyThread_release_lock 释放锁。
// Modules/_threadmodule.c
static void
release_sentinel(void *wr_raw)
{
PyObject *wr = _PyObject_CAST(wr_raw);
/* Tricky: this function is called when the current thread state
is being deleted. Therefore, only simple C code can safely
execute here. */
PyObject *obj = PyWeakref_GET_OBJECT(wr);
lockobject *lock;
if (obj != Py_None) {
assert(Py_TYPE(obj) == &Locktype);
lock = (lockobject *) obj;
if (lock->locked) {
PyThread_release_lock(lock->lock_lock);
lock->locked = 0;
}
}
/* Deallocating a weakref with a NULL callback only calls
PyObject_GC_Del(), which can't call any Python code. */
Py_DECREF(wr);
}如子线程启动成功后通知父线程的 _started 事件,由 Event 实现。父线程通过 wait 方法阻塞等待事件发生,子线程由 set 方法通知所有等待线程事件已发生。
class Thread:
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
self._started = Event()
def start(self):
# 省略代码
self._started.wait()
def _bootstrap_inner(self):
# 省略代码
self._started.set()Event 的实现中维持了一个事件发生标记 _flag,值为真表示事件发生,否则未发生。另外还维持了一个条件变量 _cond 用于互斥访问 _flag 和阻塞线程。
其中,set(self) 方法调用后标记 _flag 为 True,并通知所有阻塞在条件变量 _cond 中的线程事件发生。而 wait(self, timeout) 方法调用后,若 _flag 为 True 则直接返回,否则将当前线程阻塞到条件变量 _cond 的阻塞队列中等待事件发生。
import _thread
_allocate_lock = _thread.allocate_lock
Lock = _allocate_lock
class Event:
"""Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset
to false with the clear() method. The wait() method blocks until the flag is
true. The flag is initially false.
"""
# After Tim Peters' event class (without is_posted())
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set
def set(self):
"""Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads
that call wait() once the flag is true will not block at all.
"""
with self._cond:
self._flag = True
self._cond.notify_all()
def clear(self):
"""Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to
set the internal flag to true again.
"""
with self._cond:
self._flag = False
def wait(self, timeout=None):
"""Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise,
block until another thread calls set() to set the flag to true, or until
the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).
This method returns the internal flag on exit, so it will always return
True except if a timeout is given and the operation times out.
"""
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaledCondition 实现上述提到的互斥访问、阻塞和通知的功能。Event 的构建条件变量 _cond 传递的参数为 Lock(),其是一个不可重入的互斥锁,即尽管同一线程持有锁的情况下再次获取锁也会被阻塞。
首先是 with 的实现,其对应到 __enter__ 和 __exit__ 方法,其中分别调用 _lock 的对应方法实现锁的获取和释放。然后是 wait 方法,其内部创建了一个新的个不可重入锁 waiter,并将其存放到队列 _waiters 中,然后第二次获取该锁阻塞该线程。而 notify 或 notifyAll 方法则将 _waiters 中存储的锁依次释放,从而唤醒阻塞进程。
import _thread
from collections import deque as _deque
_allocate_lock = _thread.allocate_lock
class Condition:
"""Class that implements a condition variable.
A condition variable allows one or more threads to wait until they are
notified by another thread.
If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.
"""
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def _release_save(self):
self._lock.release() # No state to save
def _acquire_restore(self, x):
self._lock.acquire() # Ignore saved state
def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if _lock doesn't have _is_owned().
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.
"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
"""Wake up all threads waiting on this condition.
If the calling thread has not acquired the lock when this method
is called, a RuntimeError is raised.
"""
self.notify(len(self._waiters))其中,关键的实现是互斥锁,其由 _thread 模块提供实现,如 _allocate_lock 接口提供不可重入锁对象的创建。_allocate_lock 接口对应到 thread_PyThread_allocate_lock 实现,其调用 newlockobject 函数创建锁。
其中,先创建锁对象实例 lockobject,然后通过 PyThread_allocate_lock 接口创建真正锁对象存储到 lockobject 的 lock_lock 字段,并标记锁状态 locked 为 0,最后返回 lockobject 实例。
// Modules/_threadmodule.c
static lockobject *
newlockobject(void)
{
lockobject *self;
self = PyObject_New(lockobject, &Locktype);
if (self == NULL)
return NULL;
self->lock_lock = PyThread_allocate_lock();
self->locked = 0;
self->in_weakreflist = NULL;
if (self->lock_lock == NULL) {
Py_DECREF(self);
PyErr_SetString(ThreadError, "can't allocate lock");
return NULL;
}
return self;
}
static PyObject *
thread_PyThread_allocate_lock(PyObject *self, PyObject *Py_UNUSED(ignored))
{
return (PyObject *) newlockobject();
}lockobject 对象的定义如下,其类型为 Locktype,类型方法包括获取锁 acquire 方法、释放锁 release 方法等,以及 with 的对应实现。
// Modules/_threadmodule.c
static PyMethodDef lock_methods[] = {
{"acquire", (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
METH_VARARGS | METH_KEYWORDS, acquire_doc},
{"release", (PyCFunction)lock_PyThread_release_lock,
METH_NOARGS, release_doc},
{"__enter__", (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
METH_VARARGS | METH_KEYWORDS, acquire_doc},
{"__exit__", (PyCFunction)lock_PyThread_release_lock,
METH_VARARGS, release_doc},
{NULL, NULL} /* sentinel */
};
static PyTypeObject Locktype = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"_thread.lock", /*tp_name*/
// 省略其余代码
lock_methods, /*tp_methods*/
};
typedef struct {
PyObject_HEAD
PyThread_type_lock lock_lock;
PyObject *in_weakreflist;
char locked; /* for sanity checking */
} lockobject;从上述可知,lockobject 持有 PyThread_type_lock 对象为真正的锁实现。其中,PyThread_type_lock 的真实类型为 pthread_lock,其维持着锁状态 locked,条件变量 lock_released 和互斥锁 mut 字段。在 PyThread_allocate_lock 中,由 pthread 中接口 pthread_*_init 创建对应锁,即线程锁也是由不同平台具体实现。
// Python/thread_pthread.h
typedef void *PyThread_type_lock;
typedef struct {
char locked; /* 0=unlocked, 1=locked */
/* a <cond, mutex> pair to handle an acquire of a locked lock */
pthread_cond_t lock_released;
pthread_mutex_t mut;
} pthread_lock;
int
_PyThread_cond_init(PyCOND_T *cond)
{
return pthread_cond_init(cond, condattr_monotonic);
}
PyThread_type_lock
PyThread_allocate_lock(void)
{
pthread_lock *lock;
int status, error = 0;
dprintf(("PyThread_allocate_lock called\n"));
if (!initialized)
PyThread_init_thread();
lock = (pthread_lock *) PyMem_RawMalloc(sizeof(pthread_lock));
if (lock) {
memset((void *)lock, '\0', sizeof(pthread_lock));
lock->locked = 0;
status = pthread_mutex_init(&lock->mut, NULL);
CHECK_STATUS_PTHREAD("pthread_mutex_init");
/* Mark the pthread mutex underlying a Python mutex as
pure happens-before. We can't simply mark the
Python-level mutex as a mutex because it can be
acquired and released in different threads, which
will cause errors. */
_Py_ANNOTATE_PURE_HAPPENS_BEFORE_MUTEX(&lock->mut);
status = _PyThread_cond_init(&lock->lock_released);
CHECK_STATUS_PTHREAD("pthread_cond_init");
if (error) {
PyMem_RawFree((void *)lock);
lock = 0;
}
}
dprintf(("PyThread_allocate_lock() -> %p\n", (void *)lock));
return (PyThread_type_lock) lock;
}acquire 获取锁的实现为 lock_PyThread_acquire_lock,其内部调用 acquire_timed 函数获得锁,若返回值为 PY_LOCK_ACQUIRED 表明获得锁成功,返回 True。
acquire_timed 封装可被打断的获得锁功能,即申请锁过程中允许被如 KeyboardInterrupt 信号所打断,然后能够继续申请锁逻辑。具体来说,首先由 PyThread_acquire_lock_timed 函数非阻塞方式申请锁,若失败(即 PY_LOCK_FAILURE)则以阻塞方式获取。在获取过程如被打断,则返回状态 PY_LOCK_INTR,那么由 Py_MakePendingCalls 处理异步信号处理,接着继续申请锁。
static PyObject *
lock_PyThread_acquire_lock(lockobject *self, PyObject *args, PyObject *kwds)
{
_PyTime_t timeout;
PyLockStatus r;
if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
return NULL;
r = acquire_timed(self->lock_lock, timeout);
if (r == PY_LOCK_INTR) {
return NULL;
}
if (r == PY_LOCK_ACQUIRED)
self->locked = 1;
return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}
/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
static PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
PyLockStatus r;
_PyTime_t endtime = 0;
_PyTime_t microseconds;
if (timeout > 0)
endtime = _PyTime_GetMonotonicClock() + timeout;
do {
microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);
/* first a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 1);
Py_END_ALLOW_THREADS
}
if (r == PY_LOCK_INTR) {
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (Py_MakePendingCalls() < 0) {
return PY_LOCK_INTR;
}
/* If we're using a timeout, recompute the timeout after processing
* signals, since those can take time. */
if (timeout > 0) {
timeout = endtime - _PyTime_GetMonotonicClock();
/* Check for negative values, since those mean block forever.
*/
if (timeout < 0) {
r = PY_LOCK_FAILURE;
}
}
}
} while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */
return r;
}在 acquire_timed 中依赖 PyThread_acquire_lock_timed 获取锁。
PyLockStatus
PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds,
int intr_flag)
{
PyLockStatus success = PY_LOCK_FAILURE;
pthread_lock *thelock = (pthread_lock *)lock;
int status, error = 0;
dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) called\n",
lock, microseconds, intr_flag));
if (microseconds == 0) {
status = pthread_mutex_trylock( &thelock->mut );
if (status != EBUSY)
CHECK_STATUS_PTHREAD("pthread_mutex_trylock[1]");
}
else {
status = pthread_mutex_lock( &thelock->mut );
CHECK_STATUS_PTHREAD("pthread_mutex_lock[1]");
}
if (status == 0) {
if (thelock->locked == 0) {
success = PY_LOCK_ACQUIRED;
}
else if (microseconds != 0) {
struct timespec abs;
if (microseconds > 0) {
_PyThread_cond_after(microseconds, &abs);
}
/* continue trying until we get the lock */
/* mut must be locked by me -- part of the condition
* protocol */
while (success == PY_LOCK_FAILURE) {
if (microseconds > 0) {
status = pthread_cond_timedwait(
&thelock->lock_released,
&thelock->mut, &abs);
if (status == 1) {
break;
}
if (status == ETIMEDOUT)
break;
CHECK_STATUS_PTHREAD("pthread_cond_timedwait");
}
else {
status = pthread_cond_wait(
&thelock->lock_released,
&thelock->mut);
CHECK_STATUS_PTHREAD("pthread_cond_wait");
}
if (intr_flag && status == 0 && thelock->locked) {
/* We were woken up, but didn't get the lock. We probably received
* a signal. Return PY_LOCK_INTR to allow the caller to handle
* it and retry. */
success = PY_LOCK_INTR;
break;
}
else if (status == 0 && !thelock->locked) {
success = PY_LOCK_ACQUIRED;
}
}
}
if (success == PY_LOCK_ACQUIRED) thelock->locked = 1;
status = pthread_mutex_unlock( &thelock->mut );
CHECK_STATUS_PTHREAD("pthread_mutex_unlock[1]");
}
if (error) success = PY_LOCK_FAILURE;
dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) -> %d\n",
lock, microseconds, intr_flag, success));
return success;
}