Skip to content

Latest commit

 

History

History
1587 lines (1300 loc) · 56.9 KB

File metadata and controls

1587 lines (1300 loc) · 56.9 KB

如何启动一个新线程?

下文将从各个层面逐层深入,详细了解线程启动的具体实现逻辑。

threading 层面实现

threading 层面通过 start() 方法启动线程。线程对象内维护 _started 事件用于判断当前线程是否已经启动,这是一个多线程共享的变量。父线程在 start() 方法的最后阶段会调用 _started.wait(),若子线程尚未启动,则父线程会进入阻塞状态等待子线程启动完成。_initialized 状态用于记录线程对象是否正常创建和初始化。

另外,threading 模块还负责管理全局的线程对象。进入 start() 后,父线程会将子线程对象先存入 _limbo 全局变量中,表示待启动的线程。然后等待操作系统为子线程分配资源,然后子线程将自己加入到 _active 全局变量中,并从 _limbo 中移除。_limbo_active 都属于全局共享变量,所有的写操作都需要拿到 _active_limbo_lock 互斥锁。

_thread 模块中的 _start_new_thread 实现了子线程启动的具体逻辑,其第一个参数为子线程的执行函数,最顶层的函数定义在 Thread._bootstrap(self) 中,具体由 Thread._bootstrap_inner(self) 实现。这层封装采用模板方法模式实现,主要负责完成线程管理、用户代码接入和异常处理。

线程管理即根据线程所处状态维护 threading 模块中的 _limbo_active 全局变量。用户代码接入即在子线程中调用 target 函数或覆盖后的 Thread.run() 方法。异常处理则依据 excepthook 的配置来输出或处理未捕获的异常,默认由 sys.excepthook 实现。

sequenceDiagram
    participant User as 父线程
    participant Thread as Thread 对象
    participant TModule as _thread 模块
    participant Bootstrap as 子线程
    
    User->>Thread:  t = Thread(target=func, args=(1,2))
    Note over Thread: self._target = func<br/>self._args = (1,2)<br/>self._kwargs = {}<br/>self._started = Event()<br/>self._initialized = True<br/>self._invoke_excepthook = _make_invoke_excepthook()
    
    rect rgb(245, 245, 254)
        Note over User: 启动子线程
        User->>Thread: t.start()
        activate Thread

        Thread->>Thread: assert self._initialized
        
        Thread->>Thread: _limbo[self] = self
        Note over Thread: _limbo: 全局待启动线程字典<br/>临时存储等待确认的线程
        
        Thread->>TModule: _thread.start_new_thread(self._bootstrap, ())
        TModule-->>Thread: thread_id
        
        Thread->>Thread:  self._started.wait()
        Note over Thread: Event.wait() 阻塞等待新线程确认启动
    end
    
    TModule->>Bootstrap:  新线程执行 _bootstrap()
    activate Bootstrap

    Bootstrap->>Thread: self._started.set()
    Note over Thread: Event.set() 唤醒 start() 解除阻塞
    deactivate Thread
    
    Bootstrap->>Bootstrap: _active[self._ident] = self<br/>del _limbo[self]
    Note over Bootstrap: _active: 全局活跃线程字典<br/>存储正在运行的线程
    
    rect rgb(240, 255, 240)
        Note over Bootstrap: try 执行用户代码
        Bootstrap->>Bootstrap: try: 
        Bootstrap->>Bootstrap: self.run()
        Bootstrap->>User: self._target(*self._args, **self._kwargs)
        Note over User: 执行用户函数 func(1, 2)
    end
    
    rect rgb(255, 240, 240)
        Bootstrap->>Bootstrap: except SystemExit:
        Note over Bootstrap: 若为 SystemExit 则静默退出
        Bootstrap->>Bootstrap: except: 
        Bootstrap->>Bootstrap: self._invoke_excepthook(self)
        Note over Bootstrap: 调用异常钩子处理未捕获异常
        Bootstrap->>Bootstrap: finally: 
        Bootstrap->>Bootstrap: del _active[self._ident]
        Note over Bootstrap: 从活跃字典移除,清理线程信息并退出
    end
    
    deactivate Bootstrap
Loading

上述顺序图对应的具体代码实现如下,细节中包括上述未提及的 daemon 继承、异常回调配置 _stderr_invoke_excepthook 等。此外,_limbo 是线程对象到线程对象的映射;而 _activeident(操作系统分配的线程标识符)到线程对象的映射。ident 值由子线程在 _bootstrap_inner 中调用 self._set_ident() 获取并设置到 self._ident 变量。_bootstrap_inner 除上述三个功能外,还实现了 trace 钩子的配置,以支持 pdb 等调试功能。

import _thread

_start_new_thread = _thread.start_new_thread

# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {}    # maps thread id to Thread object
_limbo = {}

class Thread:
    _initialized = False

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        assert group is None, "group argument must be None for now"
        if kwargs is None:
            kwargs = {}
        self._target = target
        self._name = str(name or _newname())
        self._args = args
        self._kwargs = kwargs
        if daemon is not None:
            self._daemonic = daemon
        else:
            self._daemonic = current_thread().daemon
        self._ident = None
        if _HAVE_THREAD_NATIVE_ID:
            self._native_id = None
        self._tstate_lock = None
        self._started = Event()
        self._is_stopped = False
        self._initialized = True
        # Copy of sys.stderr used by self._invoke_excepthook()
        self._stderr = _sys.stderr
        self._invoke_excepthook = _make_invoke_excepthook()
        # For debugging and _after_fork()
        _dangling.add(self)

    def start(self):
        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the
        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the
        same thread object.

        """
        if not self._initialized:
            raise RuntimeError("thread.__init__() not called")

        if self._started.is_set():
            raise RuntimeError("threads can only be started once")
        with _active_limbo_lock:
            _limbo[self] = self
        try:
            _start_new_thread(self._bootstrap, ())
        except Exception:
            with _active_limbo_lock:
                del _limbo[self]
            raise
        self._started.wait()

    def _bootstrap(self):
        # Wrapper around the real bootstrap code that ignores
        # exceptions during interpreter cleanup.  Those typically
        # happen when a daemon thread wakes up at an unfortunate
        # moment, finds the world around it destroyed, and raises some
        # random exception *** while trying to report the exception in
        # _bootstrap_inner() below ***.  Those random exceptions
        # don't help anybody, and they confuse users, so we suppress
        # them.  We suppress them only when it appears that the world
        # indeed has already been destroyed, so that exceptions in
        # _bootstrap_inner() during normal business hours are properly
        # reported.  Also, we only suppress them for daemonic threads;
        # if a non-daemonic encounters this, something else is wrong.
        try:
            self._bootstrap_inner()
        except:
            if self._daemonic and _sys is None:
                return
            raise

    def _bootstrap_inner(self):
        try:
            self._set_ident()
            self._set_tstate_lock()
            if _HAVE_THREAD_NATIVE_ID:
                self._set_native_id()
            self._started.set()
            with _active_limbo_lock:
                _active[self._ident] = self
                del _limbo[self]

            if _trace_hook:
                _sys.settrace(_trace_hook)
            if _profile_hook:
                _sys.setprofile(_profile_hook)

            try:
                self.run()
            except:
                self._invoke_excepthook(self)
        finally:
            with _active_limbo_lock:
                try:
                    # We don't call self._delete() because it also
                    # grabs _active_limbo_lock.
                    del _active[get_ident()]
                except:
                    pass

_thread 层面实现

Thread.start(self) 可以了解到,其本质是调用 _thread.start_new_thread 接口启动子线程,具体实现位于 Modules/_threadmodule.c 文件中。start_new_thread 的实现对应于 thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) 函数。

thread_PyThread_start_new_thread 会进一步将参数封装到 struct bootstate 结构中,以提供线程代码执行所需的环境,即解释器状态和线程状态。然后将此结构作为参数传递给 t_bootstrap 函数,并调用 thread_pthread 中的 PyThread_start_new_thread 接口创建线程,最后返回线程标识符。

sequenceDiagram
    participant Py as threading 模块
    participant TNT as thread_PyThread_start_new_thread 函数
    participant PyState as pystate.c
    participant PThread as thread_pthread.c
    
    Py->>TNT: _thread.start_new_thread(func, args, kwargs)
    activate TNT

    rect rgb(245, 255, 255)
        Note over TNT: 参数检查
        TNT->>TNT: PyCallable_Check(func)<br/>PyTuple_Check(args)<br/>PyDict_Check(kwargs)
    end
    
    rect rgb(240, 255, 240)
        Note over TNT: 分配和初始化 bootstate
        TNT->>TNT: boot = PyMem_NEW(struct bootstate, 1)
        
        TNT->>PyState: boot->interp = _PyInterpreterState_Get()
        Note over PyState: 获取父线程解释器状态
        
        TNT->>PyState: boot->tstate = _PyThreadState_Prealloc(boot->interp)
        Note over PyState: 预分配子线程状态
        
        TNT->>TNT: boot->func = func<br/>boot->args = args<br/>boot->keyw = kwargs
    end
    
    rect rgb(240, 240, 255)
        Note over TNT: 引用计数与 GIL
        TNT->>TNT: Py_INCREF(func)<br/>Py_INCREF(args)<br/>Py_XINCREF(kwargs)
        Note over TNT: 防止对象被垃圾回收
        
        TNT->>TNT:  PyEval_InitThreads()
        Note over TNT: 确保 GIL 已创建
    end
    
    rect rgb(255, 255, 240)
        Note over TNT,PThread: 创建 OS 线程
        TNT->>PThread: PyThread_start_new_thread(t_bootstrap, (void*) boot)
        PThread-->>TNT: ident (unsigned long)
    end
    
    TNT-->>Py: PyLong_FromUnsignedLong(ident)
    deactivate TNT
Loading

具体实现如下,除模块及相关数据结构的定义外,算法流程与上述顺序图逐一对应。

// Modules/_threadmodules.c

static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    // 省略余下代码
}

static struct PyModuleDef threadmodule = {
    PyModuleDef_HEAD_INIT,
    "_thread",
    thread_doc,
    -1,
    thread_methods,
    NULL,
    NULL,
    NULL,
    NULL
};

struct bootstate {
    PyInterpreterState *interp;
    PyObject *func;
    PyObject *args;
    PyObject *keyw;
    PyThreadState *tstate;
};

static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    // 参数解包和校验
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    unsigned long ident;
    
    // 从 tuple fargs 中解包 2 到 3 个参数,分别赋值到 func, args, keyw
    // "start_new_thread" 仅用于错误信息输出
    if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                           &func, &args, &keyw))
        return NULL;
    if (!PyCallable_Check(func)) {
        PyErr_SetString(PyExc_TypeError,
                        "first arg must be callable");
        return NULL;
    }
    if (!PyTuple_Check(args)) {
        PyErr_SetString(PyExc_TypeError,
                        "2nd arg must be a tuple");
        return NULL;
    }
    if (keyw != NULL && !PyDict_Check(keyw)) {
        PyErr_SetString(PyExc_TypeError,
                        "optional 3rd arg must be a dictionary");
        return NULL;
    }
    // 为 bootstate 分配内存
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    // 为子线程设置 Python 解释器环境(继承自父线程)
    boot->interp = _PyInterpreterState_Get();
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    // 初始化子线程状态
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    // 确保 GIL 已创建
    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    if (ident == PYTHREAD_INVALID_THREAD_ID) {
        PyErr_SetString(ThreadError, "can't start new thread");
        Py_DECREF(func);
        Py_DECREF(args);
        Py_XDECREF(keyw);
        PyThreadState_Clear(boot->tstate);
        PyMem_DEL(boot);
        return NULL;
    }
    return PyLong_FromUnsignedLong(ident);
}

t_bootstrap 函数的执行细节如下:其主要负责获取 GIL,然后在给定的执行环境下调用 struct bootstate 中封装的函数。若出现非 SystemExit 的未捕获异常,则传播到调用层。执行结束后会清理线程对象。

sequenceDiagram
    participant T1 as 父线程
    participant OS as 操作系统
    participant T2 as 子线程
    participant TBoot as t_bootstrap 函数
    participant GIL as GIL 机制
    
    rect rgb(245, 255, 245)
        Note over T1: 父线程创建子线程
        T1->>OS: pthread_create(t_bootstrap, boot_raw)
        OS->>T2: 创建新线程
        Note over OS,T2: OS 调度器将子线程加入就绪队列
    end
    
    rect rgb(245, 245, 255)
        Note over OS,T2: OS 调度子线程执行
        OS->>T2: 上下文切换 (保存父线程状态,恢复子线程状态)
        T2->>TBoot: t_bootstrap(boot_raw)
        activate TBoot
    end
    
    rect rgb(255, 255, 245)
        Note over TBoot: t_bootstrap 初始化
        TBoot->>TBoot: boot = (struct bootstate *) boot_raw<br/>tstate = boot->tstate
        
        Note over TBoot: 绑定 OS 线程 ID 到 PyThreadState
        TBoot->>TBoot: tstate->thread_id = PyThread_get_thread_ident()
        
        Note over TBoot: 初始化线程状态
        TBoot->>TBoot: _PyThreadState_Init(&_PyRuntime, tstate)
    end
    
    rect rgb(255, 245, 255)
        Note over TBoot,GIL: 获取 GIL
        TBoot->>GIL: PyEval_AcquireThread(tstate)
        GIL->>GIL:  take_gil(tstate)
        
        GIL->>GIL:  MUTEX_LOCK(gil_mutex)
        Note over GIL: pthread_mutex_lock(gil->mutex)
        
        loop 等待 GIL 循环
            Note over GIL: 若 GIL 被其他线程占用
            GIL->>GIL: while (gil_locked == 1)
            
            Note over GIL: 请求当前持有者释放 GIL
            GIL->>GIL: SET_GIL_DROP_REQUEST()
            
            Note over GIL: 进入等待队列,释放 CPU
            GIL->>GIL: COND_TIMED_WAIT(gil_cond, 5ms)
        end
        
        Note over GIL: 标记 GIL 已锁定
        GIL->>GIL:  _Py_atomic_store(gil_locked, 1)
        
        Note over GIL: 设置为当前活跃线程
        GIL->>GIL: _PyThreadState_Current = tstate
        
        GIL->>GIL:  MUTEX_UNLOCK(gil_mutex)
        Note over GIL: pthread_mutex_unlock()
        
        Note over TBoot: 增加解释器线程计数
        TBoot->>TBoot: tstate->interp->num_threads++
    end
    
    rect rgb(245, 255, 255)
        Note over TBoot: 调用用户提供的 Python 函数
        TBoot->>TBoot: res = PyObject_Call(boot->func, boot->args, boot->keyw)
        
        alt res == NULL (发生异常)
            Note over TBoot: SystemExit 异常被静默忽略
            TBoot->>TBoot: if (PyErr_ExceptionMatches(PyExc_SystemExit))<br/>PyErr_Clear()
        else 其他异常
            Note over TBoot: 异常传播到上层
            TBoot->>TBoot: _PyErr_WriteUnraisableMsg("in thread started by", boot->func)
        end
    end
    
    rect rgb(255, 240, 240)
        Note over TBoot: 清理引导参数
        TBoot->>TBoot: Py_DECREF(boot->func)<br/>Py_DECREF(boot->args)<br/>Py_XDECREF(boot->keyw)<br/>PyMem_DEL(boot_raw)
        
        Note over TBoot: 减少解释器线程计数
        TBoot->>TBoot: tstate->interp->num_threads--
    end
    
    rect rgb(245, 245, 245)
        Note over TBoot: 清理线程状态
        TBoot->>TBoot: PyThreadState_Clear(tstate)
        Note over TBoot: 清理线程状态对象<br/>frame, dict, curexc_*, ...
        
        TBoot->>GIL: PyThreadState_DeleteCurrent()
        Note over GIL: 从解释器链表中移除 tstate<br/>调用 drop_gil() 释放 GIL
        GIL->>GIL:  drop_gil()
        Note over GIL: 唤醒等待的线程<br/>futex_wake(gil_cond->futex, 1)
        
        Note over TBoot: 调用 pthread_exit(NULL)<br/>退出线程
        TBoot->>TBoot: PyThread_exit_thread()
        deactivate TBoot 
    end
Loading

对应的实现代码和依赖函数如下所示。

// Modules/_threadmodule.c
static void
t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(&_PyRuntime, tstate);
    PyEval_AcquireThread(tstate);
    tstate->interp->num_threads++;
    res = PyObject_Call(boot->func, boot->args, boot->keyw);
    if (res == NULL) {
        if (PyErr_ExceptionMatches(PyExc_SystemExit))
            /* SystemExit is ignored silently */
            PyErr_Clear();
        else {
            _PyErr_WriteUnraisableMsg("in thread started by", boot->func);
        }
    }
    else {
        Py_DECREF(res);
    }
    Py_DECREF(boot->func);
    Py_DECREF(boot->args);
    Py_XDECREF(boot->keyw);
    PyMem_DEL(boot_raw);
    tstate->interp->num_threads--;
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}

// Python/ceval.c
void
PyEval_AcquireThread(PyThreadState *tstate)
{
    if (tstate == NULL) {
        Py_FatalError("PyEval_AcquireThread: NULL new thread state");
    }

    _PyRuntimeState *runtime = &_PyRuntime;
    struct _ceval_runtime_state *ceval = &runtime->ceval;

    /* Check someone has called PyEval_InitThreads() to create the lock */
    assert(gil_created(&ceval->gil));
    take_gil(ceval, tstate);
    exit_thread_if_finalizing(runtime, tstate);
    if (_PyThreadState_Swap(&runtime->gilstate, tstate) != NULL) {
        Py_FatalError("PyEval_AcquireThread: non-NULL old thread state");
    }
}

// Python/ceval_gil.h
static void
take_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate)
{
    if (tstate == NULL) {
        Py_FatalError("take_gil: NULL tstate");
    }

    struct _gil_runtime_state *gil = &ceval->gil;
    int err = errno;
    MUTEX_LOCK(gil->mutex);

    if (!_Py_atomic_load_relaxed(&gil->locked)) {
        goto _ready;
    }

    while (_Py_atomic_load_relaxed(&gil->locked)) {
        int timed_out = 0;
        unsigned long saved_switchnum;

        saved_switchnum = gil->switch_number;


        unsigned long interval = (gil->interval >= 1 ? gil->interval : 1);
        COND_TIMED_WAIT(gil->cond, gil->mutex, interval, timed_out);
        /* If we timed out and no switch occurred in the meantime, it is time
           to ask the GIL-holding thread to drop it. */
        if (timed_out &&
            _Py_atomic_load_relaxed(&gil->locked) &&
            gil->switch_number == saved_switchnum)
        {
            SET_GIL_DROP_REQUEST(ceval);
        }
    }
_ready:
#ifdef FORCE_SWITCHING
    /* This mutex must be taken before modifying gil->last_holder:
       see drop_gil(). */
    MUTEX_LOCK(gil->switch_mutex);
#endif
    /* We now hold the GIL */
    _Py_atomic_store_relaxed(&gil->locked, 1);
    _Py_ANNOTATE_RWLOCK_ACQUIRED(&gil->locked, /*is_write=*/1);

    if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(&gil->last_holder)) {
        _Py_atomic_store_relaxed(&gil->last_holder, (uintptr_t)tstate);
        ++gil->switch_number;
    }

#ifdef FORCE_SWITCHING
    COND_SIGNAL(gil->switch_cond);
    MUTEX_UNLOCK(gil->switch_mutex);
#endif
    if (_Py_atomic_load_relaxed(&ceval->gil_drop_request)) {
        RESET_GIL_DROP_REQUEST(ceval);
    }
    if (tstate->async_exc != NULL) {
        _PyEval_SignalAsyncExc(ceval);
    }

    MUTEX_UNLOCK(gil->mutex);
    errno = err;
}

thread_pthread 层面实现

PyThread_start_new_thread 是 CPython 最底层的线程创建函数,其内部主要负责封装参数,然后调用 pthread_create 接口创建线程。

sequenceDiagram
    participant Caller as _thread.thread_PyThread_start_new_thread 函数
    participant PST as PyThread_start_new_thread 函数
    participant OS as 操作系统(pthread 接口)
    participant NewThread as 子线程
    participant Wrapper as pythread_wrapper 函数
    participant func as 用户函数
    
    rect rgb(245, 255, 245)
        Note over Caller,PST: 调用线程请求创建新线程
        Caller->>PST: PyThread_start_new_thread(t_bootstrap, (void*) boot)
        activate PST
        
        alt !initialized
            Note over PST: 首次调用,初始化线程系统
            PST->>PST: PyThread_init_thread()
            Note over PST: 初始化条件变量属性
        end
    end
    
    rect rgb(245, 245, 255)
        Note over PST: 配置线程属性(如栈大小、调度范围等)
        
        alt THREAD_STACK_SIZE or PTHREAD_SYSTEM_SCHED_SUPPORTED
            PST->>OS: pthread_attr_init(&attrs)
            OS-->>PST: attrs
            
            alt THREAD_STACK_SIZE defined
                Note over PST: 获取栈大小配置
                PST->>PST: tstate = _PyThreadState_GET()<br/>stacksize = tstate->interp->pythread_stacksize<br/>tss = stacksize ? stacksize : THREAD_STACK_SIZE
                
                alt tss != 0
                    PST->>OS: pthread_attr_setstacksize(&attrs, tss)
                    Note over OS: 设置线程栈大小
                    
                    alt 设置失败
                        PST->>OS: pthread_attr_destroy(&attrs)
                        PST-->>Caller: PYTHREAD_INVALID_THREAD_ID
                    end
                end
            end
            
            alt PTHREAD_SYSTEM_SCHED_SUPPORTED defined
                PST->>OS: pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM)
                Note over OS: 设置系统级调度范围
            end
        end
    end
    
    rect rgb(255, 255, 245)
        Note over PST: 准备回调数据结构
        PST->>PST: callback = PyMem_RawMalloc(sizeof(pythread_callback))
        
        alt callback == NULL
            Note over PST: 内存分配失败
            PST-->>Caller: PYTHREAD_INVALID_THREAD_ID
        end
        
        Note over PST: 封装用户函数和参数
        PST->>PST: callback->func = func<br/>callback->arg = arg
    end
    
    rect rgb(255, 245, 255)
        Note over PST,OS: 创建新线程
        PST->>OS: pthread_create(&th, &attrs, pythread_wrapper, callback)
        
        alt 创建成功
            Note over OS: 操作系统创建新线程
            OS->>NewThread: 线程被创建并调度
            Note over OS,NewThread: OS 调度器将新线程加入就绪队列
            
            OS-->>PST: status = 0
            
            Note over PST: 销毁线程属性对象
            alt attrs was initialized
                PST->>OS: pthread_attr_destroy(&attrs)
            end
            
            Note over PST: 分离线程(自动回收资源)
            PST->>OS: pthread_detach(th)
            Note over OS: 线程结束后自动回收资源
            
            Note over PST: 返回线程标识符
            PST->>PST: thread_id = (unsigned long) th
            PST-->>Caller: thread_id
            
        else 创建失败
            OS-->>PST: status != 0
            
            Note over PST: 清理资源
            alt attrs was initialized
                PST->>OS: pthread_attr_destroy(&attrs)
            end
            PST->>PST: PyMem_RawFree(callback)
            
            PST-->>Caller: PYTHREAD_INVALID_THREAD_ID
        end
        
        deactivate PST
    end

    rect rgb(245, 255, 255)
        Note over NewThread,Wrapper: OS 调度新线程执行
        OS->>NewThread: 上下文切换到新线程
        NewThread->>Wrapper: pythread_wrapper(callback)
        activate Wrapper
        
        Note over Wrapper: 解包回调数据
        Wrapper->>Wrapper: void (*func)(void *) = callback->func<br/>void *func_arg = callback->arg
        
        Note over Wrapper: 释放临时回调结构
        Wrapper->>Wrapper: PyMem_RawFree(callback)
    end
    
    rect rgb(255, 240, 240)
        Note over Wrapper,UserFunc: 执行用户函数
        Wrapper->>UserFunc: func(func_arg)
        activate UserFunc
        
        Note over UserFunc: 用户提供的函数逻辑执行
        UserFunc-->>Wrapper: 函数执行完成
        deactivate UserFunc
        
        Wrapper-->>NewThread: return NULL
        deactivate Wrapper
    end
    
    rect rgb(245, 245, 245)
        Note over NewThread: 线程正常退出
        NewThread->>OS: pthread_exit(0)
        Note over OS: 操作系统回收线程资源
    end
Loading

具体代码实现如下。其中 pthread 是一套跨平台的线程编程接口(POSIX Threads),由不同操作系统具体实现(如 Linux 由 NPTL 实现)。相关类型包括:pthread_t 表示线程标识符,用作 pthread_* 接口的操作参数;pthread_attr_t 定义了线程的属性对象,如栈大小和调度范围等信息,在创建新线程时作为配置参数传递。

相关函数包括:pthread_attr_initpthread_attr_destroy 用于初始化和销毁 pthread_attr_tpthread_attr_set* 系列函数用于设置相关属性;pthread_create 用于创建线程,需要接收线程标识符、线程配置、线程入口函数及其参数;pthread_detach 用于分离线程,即将线程的管理交由操作系统,线程结束后由操作系统自动回收资源。因此在 Python 中,启动线程后无需手动管理其生命周期。

// Python/thread_pthread.h
typedef struct {
    void (*func) (void *);
    void *arg;
} pythread_callback;

static void *
pythread_wrapper(void *arg)
{
    /* copy func and func_arg and free the temporary structure */
    pythread_callback *callback = arg;
    void (*func)(void *) = callback->func;
    void *func_arg = callback->arg;
    PyMem_RawFree(arg);

    func(func_arg);
    return NULL;
}

unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
    pthread_t th;
    int status;
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_t attrs;
#endif
#if defined(THREAD_STACK_SIZE)
    size_t      tss;
#endif

    dprintf(("PyThread_start_new_thread called\n"));
    if (!initialized)
        PyThread_init_thread();

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    if (pthread_attr_init(&attrs) != 0)
        return PYTHREAD_INVALID_THREAD_ID;
#endif
#if defined(THREAD_STACK_SIZE)
    PyThreadState *tstate = _PyThreadState_GET();
    size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
    tss = (stacksize != 0) ? stacksize : THREAD_STACK_SIZE;
    if (tss != 0) {
        if (pthread_attr_setstacksize(&attrs, tss) != 0) {
            pthread_attr_destroy(&attrs);
            return PYTHREAD_INVALID_THREAD_ID;
        }
    }
#endif
#if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
#endif

    pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback));

    if (callback == NULL) {
      return PYTHREAD_INVALID_THREAD_ID;
    }

    callback->func = func;
    callback->arg = arg;

    status = pthread_create(&th,
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
                             &attrs,
#else
                             (pthread_attr_t*)NULL,
#endif
                             pythread_wrapper, callback);

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_destroy(&attrs);
#endif

    if (status != 0) {
        PyMem_RawFree(callback);
        return PYTHREAD_INVALID_THREAD_ID;
    }

    pthread_detach(th);

#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
    return (unsigned long) th;
#else
    return (unsigned long) *(unsigned long *) &th;
#endif
}

如何等待一个线程?

父线程通过调用子线程对象的 join 方法等待该线程执行完毕或超时。join 方法首先验证调用的合法性,然后通过尝试获取子线程的 _tstate_lock 锁。若锁已被释放,说明子线程已执行完毕;否则父线程进入阻塞状态,等待子线程结束后被唤醒,或在超时后被操作系统唤醒。

sequenceDiagram
    participant Caller as 父线程
    participant ThreadObj as Thread 对象
    participant TStateLock as _tstate_lock
    participant WorkerThread as 子线程
    participant CRuntime as _thread.t_bootstrap 函数
    
    rect rgb(245, 255, 245)
        Note over Caller: 父线程开始等待子线程
        Caller->>ThreadObj: t.join(timeout)
        activate ThreadObj
        
        Note over ThreadObj: 参数和状态验证
        ThreadObj->>ThreadObj: if not self._initialized:<br/>raise RuntimeError("Thread.__init__() not called")
        ThreadObj->>ThreadObj: if not self._started.is_set():<br/>raise RuntimeError("cannot join thread before it is started")
        ThreadObj->>ThreadObj: if self is current_thread():<br/>raise RuntimeError("cannot join current thread")
    end
    
    rect rgb(245, 245, 255)
        Note over ThreadObj: 确定等待策略
        alt timeout is None (无限等待)
            ThreadObj->>ThreadObj: _wait_for_tstate_lock()
            Note over ThreadObj: block=True, timeout=-1
        else timeout >= 0 (超时等待)
            ThreadObj->>ThreadObj: _wait_for_tstate_lock(timeout=max(timeout, 0))
        end
    end
    
    rect rgb(255, 255, 245)
        Note over ThreadObj,TStateLock: 尝试获取 self._tstate_lock
        ThreadObj->>TStateLock: lock = self._tstate_lock
        
        alt lock is None (子线程已结束)
            ThreadObj->>ThreadObj: assert self._is_stopped
            ThreadObj-->>Caller: 立即返回
        else lock exists (子线程仍在运行)
            Note over TStateLock: 尝试获取锁
        end
    end
    
    rect rgb(255, 245, 255)
        Note over TStateLock,WorkerThread: 子线程继续执行
        
        activate WorkerThread
        WorkerThread->>WorkerThread: 执行用户代码
        
        Note over TStateLock: _tstate_lock 被子线程持有
        TStateLock->>TStateLock: lock.acquire(block, timeout)
        Note over TStateLock: 父线程在此阻塞,等待锁释放
        
        Note over Caller,TStateLock: 父线程进入睡眠,OS 调度其他线程运行
    end
    
    rect rgb(245, 255, 255)
        Note over WorkerThread: 工作线程即将结束
        
        WorkerThread->>WorkerThread: _bootstrap_inner() 执行完成
        WorkerThread->>WorkerThread: finally: del _active[get_ident()]
        
        WorkerThread->>CRuntime: 线程退出清理
        deactivate WorkerThread
    end
    
    rect rgb(255, 240, 245)
        Note over CRuntime: 线程清理和锁释放
        
        CRuntime->>CRuntime: PyThreadState_Clear(tstate)
        Note over CRuntime: 清理 Python 线程状态
        CRuntime->>CRuntime: frame, dict, curexc_*, ...
        
        CRuntime->>CRuntime: 从解释器状态链表移除该线程状态
        
        Note over CRuntime: 释放 _tstate_lock
        CRuntime->>TStateLock: tstate_lock.release()
        Note over TStateLock: 锁被释放,唤醒等待线程
    end
    
    rect rgb(245, 255, 240)
        Note over TStateLock,Caller: 父线程被唤醒
        
        TStateLock-->>ThreadObj: lock.acquire() 返回 True
        Note over ThreadObj: 成功获取锁,表示子线程已结束
        
        ThreadObj->>TStateLock: lock.release()
        
        ThreadObj->>ThreadObj: self._stop()
        Note over ThreadObj: 标记线程已停止,清理锁引用
        Note over ThreadObj: self._is_stopped = True<br/>self._tstate_lock = None
    end
    
    rect rgb(240, 255, 240)
        ThreadObj-->>Caller: 返回 None
        deactivate ThreadObj
        
        Note over Caller: 子线程已结束
    end
    
    rect rgb(255, 245, 245)
        Note over Caller: 超时返回场景
        
        alt 发生超时
            Note over TStateLock: lock.acquire(timeout) 返回 False
            ThreadObj-->>Caller: 返回 None(子线程仍在运行)
            Note over Caller: 调用 thread.is_alive() 检查
        end
    end
Loading

join 的实现代码如下,本质上是由互斥锁进行线程间通信的一个例子。

class Thread:
    _initialized = False

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        self._tstate_lock = None
        self._started = Event()
        self._is_stopped = False
        self._initialized = True
    
    def join(self, timeout=None):
        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is
        called terminates -- either normally or through an unhandled exception
        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof). As join() always returns None, you must call
        is_alive() after join() to decide whether a timeout happened -- if the
        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will
        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current
        thread as that would cause a deadlock. It is also an error to join() a
        thread before it has been started and attempts to do so raises the same
        exception.

        """
        if not self._initialized:
            raise RuntimeError("Thread.__init__() not called")
        if not self._started.is_set():
            raise RuntimeError("cannot join thread before it is started")
        if self is current_thread():
            raise RuntimeError("cannot join current thread")

        if timeout is None:
            self._wait_for_tstate_lock()
        else:
            # the behavior of a negative timeout isn't documented, but
            # historically .join(timeout=x) for x<0 has acted as if timeout=0
            self._wait_for_tstate_lock(timeout=max(timeout, 0))

    def _wait_for_tstate_lock(self, block=True, timeout=-1):
        # Issue #18808: wait for the thread state to be gone.
        # At the end of the thread's life, after all knowledge of the thread
        # is removed from C data structures, C code releases our _tstate_lock.
        # This method passes its arguments to _tstate_lock.acquire().
        # If the lock is acquired, the C code is done, and self._stop() is
        # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
        lock = self._tstate_lock
        if lock is None:  # already determined that the C code is done
            assert self._is_stopped
        elif lock.acquire(block, timeout):
            lock.release()
            self._stop()

线程锁 _tstate_lock 区别于其它类型锁,能够在线程结束后自动被释放。首先 _tstate_lock 锁是在子线程启动时由 _thread._set_sentinel 创建,然后立即被子线程持有。

import _thread

_set_sentinel = _thread._set_sentinel

class Thread:
    def _set_tstate_lock(self):
        """
        Set a lock object which will be released by the interpreter when
        the underlying thread state (see pystate.h) gets deleted.
        """
        self._tstate_lock = _set_sentinel()
        self._tstate_lock.acquire()

        if not self.daemon:
            with _shutdown_locks_lock:
                _shutdown_locks.add(self._tstate_lock)

    def _bootstrap_inner(self):
        try:
            # 省略余下代码
            self._set_tstate_lock()
            # 省略余下代码
        finally:
            # 省略余下代码

然后在 _set_sentinel() 中,线程锁由 newlockobject() 创建,并弱绑定到子线程状态的 on_delete_data 字段,同时指定销毁时的回调函数 on_deleterelease_sentinel。因此当线程执行结束后,t_bootstrap 函数调用 PyThreadState_Clear(tstate) 时会触发回调函数 release_sentinel,从而释放线程锁。

// Modules/_threadmodule.c
static PyObject *
thread__set_sentinel(PyObject *self, PyObject *Py_UNUSED(ignored))
{
    PyObject *wr;
    PyThreadState *tstate = PyThreadState_Get();
    lockobject *lock;

    if (tstate->on_delete_data != NULL) {
        /* We must support the re-creation of the lock from a
           fork()ed child. */
        assert(tstate->on_delete == &release_sentinel);
        wr = (PyObject *) tstate->on_delete_data;
        tstate->on_delete = NULL;
        tstate->on_delete_data = NULL;
        Py_DECREF(wr);
    }
    lock = newlockobject();
    if (lock == NULL)
        return NULL;
    /* The lock is owned by whoever called _set_sentinel(), but the weakref
       hangs to the thread state. */
    wr = PyWeakref_NewRef((PyObject *) lock, NULL);
    if (wr == NULL) {
        Py_DECREF(lock);
        return NULL;
    }
    tstate->on_delete_data = (void *) wr;
    tstate->on_delete = &release_sentinel;
    return (PyObject *) lock;
}

release_sentinel 的实现如下,其内部通过调用 PyThread_release_lock 释放锁。

// Modules/_threadmodule.c
static void
release_sentinel(void *wr_raw)
{
    PyObject *wr = _PyObject_CAST(wr_raw);
    /* Tricky: this function is called when the current thread state
       is being deleted.  Therefore, only simple C code can safely
       execute here. */
    PyObject *obj = PyWeakref_GET_OBJECT(wr);
    lockobject *lock;
    if (obj != Py_None) {
        assert(Py_TYPE(obj) == &Locktype);
        lock = (lockobject *) obj;
        if (lock->locked) {
            PyThread_release_lock(lock->lock_lock);
            lock->locked = 0;
        }
    }
    /* Deallocating a weakref with a NULL callback only calls
       PyObject_GC_Del(), which can't call any Python code. */
    Py_DECREF(wr);
}

线程之间如何进行安全通信?

如子线程启动成功后通知父线程的 _started 事件,由 Event 实现。父线程通过 wait 方法阻塞等待事件发生,子线程由 set 方法通知所有等待线程事件已发生。

class Thread:
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        self._started = Event()

    def start(self):
        # 省略代码
        self._started.wait()
    
    def _bootstrap_inner(self):
        # 省略代码
        self._started.set()

Event 的实现中维持了一个事件发生标记 _flag,值为真表示事件发生,否则未发生。另外还维持了一个条件变量 _cond 用于互斥访问 _flag 和阻塞线程。

其中,set(self) 方法调用后标记 _flagTrue,并通知所有阻塞在条件变量 _cond 中的线程事件发生。而 wait(self, timeout) 方法调用后,若 _flagTrue 则直接返回,否则将当前线程阻塞到条件变量 _cond 的阻塞队列中等待事件发生。

import _thread

_allocate_lock = _thread.allocate_lock
Lock = _allocate_lock

class Event:
    """Class implementing event objects.

    Events manage a flag that can be set to true with the set() method and reset
    to false with the clear() method. The wait() method blocks until the flag is
    true.  The flag is initially false.

    """

    # After Tim Peters' event class (without is_posted())

    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False

    def _reset_internal_locks(self):
        # private!  called by Thread._reset_internal_locks by _after_fork()
        self._cond.__init__(Lock())

    def is_set(self):
        """Return true if and only if the internal flag is true."""
        return self._flag

    isSet = is_set

    def set(self):
        """Set the internal flag to true.

        All threads waiting for it to become true are awakened. Threads
        that call wait() once the flag is true will not block at all.

        """
        with self._cond:
            self._flag = True
            self._cond.notify_all()

    def clear(self):
        """Reset the internal flag to false.

        Subsequently, threads calling wait() will block until set() is called to
        set the internal flag to true again.

        """
        with self._cond:
            self._flag = False

    def wait(self, timeout=None):
        """Block until the internal flag is true.

        If the internal flag is true on entry, return immediately. Otherwise,
        block until another thread calls set() to set the flag to true, or until
        the optional timeout occurs.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).

        This method returns the internal flag on exit, so it will always return
        True except if a timeout is given and the operation times out.

        """
        with self._cond:
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled

Condition 实现上述提到的互斥访问、阻塞和通知的功能。Event 的构建条件变量 _cond 传递的参数为 Lock(),其是一个不可重入的互斥锁,即尽管同一线程持有锁的情况下再次获取锁也会被阻塞。

首先是 with 的实现,其对应到 __enter____exit__ 方法,其中分别调用 _lock 的对应方法实现锁的获取和释放。然后是 wait 方法,其内部创建了一个新的个不可重入锁 waiter,并将其存放到队列 _waiters 中,然后第二次获取该锁阻塞该线程。而 notifynotifyAll 方法则将 _waiters 中存储的锁依次释放,从而唤醒阻塞进程。

import _thread
from collections import deque as _deque

_allocate_lock = _thread.allocate_lock

class Condition:
    """Class that implements a condition variable.

    A condition variable allows one or more threads to wait until they are
    notified by another thread.

    If the lock argument is given and not None, it must be a Lock or RLock
    object, and it is used as the underlying lock. Otherwise, a new RLock object
    is created and used as the underlying lock.

    """

    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()

    def __enter__(self):
        return self._lock.__enter__()

    def __exit__(self, *args):
        return self._lock.__exit__(*args)

    def _release_save(self):
        self._lock.release()           # No state to save

    def _acquire_restore(self, x):
        self._lock.acquire()           # Ignore saved state

    def _is_owned(self):
        # Return True if lock is owned by current_thread.
        # This method is called only if _lock doesn't have _is_owned().
        if self._lock.acquire(0):
            self._lock.release()
            return False
        else:
            return True

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).

        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.

        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass

    def notify(self, n=1):
        """Wake up one or more threads waiting on this condition, if any.

        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.

        This method wakes up at most n of the threads waiting for the condition
        variable; it is a no-op if no threads are waiting.

        """
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

    def notify_all(self):
        """Wake up all threads waiting on this condition.

        If the calling thread has not acquired the lock when this method
        is called, a RuntimeError is raised.

        """
        self.notify(len(self._waiters))

其中,关键的实现是互斥锁,其由 _thread 模块提供实现,如 _allocate_lock 接口提供不可重入锁对象的创建。_allocate_lock 接口对应到 thread_PyThread_allocate_lock 实现,其调用 newlockobject 函数创建锁。

其中,先创建锁对象实例 lockobject,然后通过 PyThread_allocate_lock 接口创建真正锁对象存储到 lockobjectlock_lock 字段,并标记锁状态 locked0,最后返回 lockobject 实例。

// Modules/_threadmodule.c
static lockobject *
newlockobject(void)
{
    lockobject *self;
    self = PyObject_New(lockobject, &Locktype);
    if (self == NULL)
        return NULL;
    self->lock_lock = PyThread_allocate_lock();
    self->locked = 0;
    self->in_weakreflist = NULL;
    if (self->lock_lock == NULL) {
        Py_DECREF(self);
        PyErr_SetString(ThreadError, "can't allocate lock");
        return NULL;
    }
    return self;
}

static PyObject *
thread_PyThread_allocate_lock(PyObject *self, PyObject *Py_UNUSED(ignored))
{
    return (PyObject *) newlockobject();
}

lockobject 对象的定义如下,其类型为 Locktype,类型方法包括获取锁 acquire 方法、释放锁 release 方法等,以及 with 的对应实现。

// Modules/_threadmodule.c
static PyMethodDef lock_methods[] = {
    {"acquire",      (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
     METH_VARARGS | METH_KEYWORDS, acquire_doc},
    {"release",      (PyCFunction)lock_PyThread_release_lock,
     METH_NOARGS, release_doc},
    {"__enter__",    (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
     METH_VARARGS | METH_KEYWORDS, acquire_doc},
    {"__exit__",    (PyCFunction)lock_PyThread_release_lock,
     METH_VARARGS, release_doc},
    {NULL,           NULL}              /* sentinel */
};

static PyTypeObject Locktype = {
    PyVarObject_HEAD_INIT(&PyType_Type, 0)
    "_thread.lock",                     /*tp_name*/
    // 省略其余代码
    lock_methods,                       /*tp_methods*/
};

typedef struct {
    PyObject_HEAD
    PyThread_type_lock lock_lock;
    PyObject *in_weakreflist;
    char locked; /* for sanity checking */
} lockobject;

从上述可知,lockobject 持有 PyThread_type_lock 对象为真正的锁实现。其中,PyThread_type_lock 的真实类型为 pthread_lock,其维持着锁状态 locked,条件变量 lock_released 和互斥锁 mut 字段。在 PyThread_allocate_lock 中,由 pthread 中接口 pthread_*_init 创建对应锁,即线程锁也是由不同平台具体实现。

// Python/thread_pthread.h
typedef void *PyThread_type_lock;

typedef struct {
    char             locked; /* 0=unlocked, 1=locked */
    /* a <cond, mutex> pair to handle an acquire of a locked lock */
    pthread_cond_t   lock_released;
    pthread_mutex_t  mut;
} pthread_lock;

int
_PyThread_cond_init(PyCOND_T *cond)
{
    return pthread_cond_init(cond, condattr_monotonic);
}

PyThread_type_lock
PyThread_allocate_lock(void)
{
    pthread_lock *lock;
    int status, error = 0;

    dprintf(("PyThread_allocate_lock called\n"));
    if (!initialized)
        PyThread_init_thread();

    lock = (pthread_lock *) PyMem_RawMalloc(sizeof(pthread_lock));
    if (lock) {
        memset((void *)lock, '\0', sizeof(pthread_lock));
        lock->locked = 0;

        status = pthread_mutex_init(&lock->mut, NULL);
        CHECK_STATUS_PTHREAD("pthread_mutex_init");
        /* Mark the pthread mutex underlying a Python mutex as
           pure happens-before.  We can't simply mark the
           Python-level mutex as a mutex because it can be
           acquired and released in different threads, which
           will cause errors. */
        _Py_ANNOTATE_PURE_HAPPENS_BEFORE_MUTEX(&lock->mut);

        status = _PyThread_cond_init(&lock->lock_released);
        CHECK_STATUS_PTHREAD("pthread_cond_init");

        if (error) {
            PyMem_RawFree((void *)lock);
            lock = 0;
        }
    }

    dprintf(("PyThread_allocate_lock() -> %p\n", (void *)lock));
    return (PyThread_type_lock) lock;
}

acquire 获取锁的实现为 lock_PyThread_acquire_lock,其内部调用 acquire_timed 函数获得锁,若返回值为 PY_LOCK_ACQUIRED 表明获得锁成功,返回 True

acquire_timed 封装可被打断的获得锁功能,即申请锁过程中允许被如 KeyboardInterrupt 信号所打断,然后能够继续申请锁逻辑。具体来说,首先由 PyThread_acquire_lock_timed 函数非阻塞方式申请锁,若失败(即 PY_LOCK_FAILURE)则以阻塞方式获取。在获取过程如被打断,则返回状态 PY_LOCK_INTR,那么由 Py_MakePendingCalls 处理异步信号处理,接着继续申请锁。

static PyObject *
lock_PyThread_acquire_lock(lockobject *self, PyObject *args, PyObject *kwds)
{
    _PyTime_t timeout;
    PyLockStatus r;

    if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
        return NULL;

    r = acquire_timed(self->lock_lock, timeout);
    if (r == PY_LOCK_INTR) {
        return NULL;
    }

    if (r == PY_LOCK_ACQUIRED)
        self->locked = 1;
    return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}

/* Helper to acquire an interruptible lock with a timeout.  If the lock acquire
 * is interrupted, signal handlers are run, and if they raise an exception,
 * PY_LOCK_INTR is returned.  Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
 * are returned, depending on whether the lock can be acquired within the
 * timeout.
 */
static PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
    PyLockStatus r;
    _PyTime_t endtime = 0;
    _PyTime_t microseconds;

    if (timeout > 0)
        endtime = _PyTime_GetMonotonicClock() + timeout;

    do {
        microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);

        /* first a simple non-blocking try without releasing the GIL */
        r = PyThread_acquire_lock_timed(lock, 0, 0);
        if (r == PY_LOCK_FAILURE && microseconds != 0) {
            Py_BEGIN_ALLOW_THREADS
            r = PyThread_acquire_lock_timed(lock, microseconds, 1);
            Py_END_ALLOW_THREADS
        }

        if (r == PY_LOCK_INTR) {
            /* Run signal handlers if we were interrupted.  Propagate
             * exceptions from signal handlers, such as KeyboardInterrupt, by
             * passing up PY_LOCK_INTR.  */
            if (Py_MakePendingCalls() < 0) {
                return PY_LOCK_INTR;
            }

            /* If we're using a timeout, recompute the timeout after processing
             * signals, since those can take time.  */
            if (timeout > 0) {
                timeout = endtime - _PyTime_GetMonotonicClock();

                /* Check for negative values, since those mean block forever.
                 */
                if (timeout < 0) {
                    r = PY_LOCK_FAILURE;
                }
            }
        }
    } while (r == PY_LOCK_INTR);  /* Retry if we were interrupted. */

    return r;
}

acquire_timed 中依赖 PyThread_acquire_lock_timed 获取锁。

PyLockStatus
PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds,
                            int intr_flag)
{
    PyLockStatus success = PY_LOCK_FAILURE;
    pthread_lock *thelock = (pthread_lock *)lock;
    int status, error = 0;

    dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) called\n",
             lock, microseconds, intr_flag));

    if (microseconds == 0) {
        status = pthread_mutex_trylock( &thelock->mut );
        if (status != EBUSY)
            CHECK_STATUS_PTHREAD("pthread_mutex_trylock[1]");
    }
    else {
        status = pthread_mutex_lock( &thelock->mut );
        CHECK_STATUS_PTHREAD("pthread_mutex_lock[1]");
    }
    if (status == 0) {
        if (thelock->locked == 0) {
            success = PY_LOCK_ACQUIRED;
        }
        else if (microseconds != 0) {
            struct timespec abs;
            if (microseconds > 0) {
                _PyThread_cond_after(microseconds, &abs);
            }
            /* continue trying until we get the lock */

            /* mut must be locked by me -- part of the condition
             * protocol */
            while (success == PY_LOCK_FAILURE) {
                if (microseconds > 0) {
                    status = pthread_cond_timedwait(
                        &thelock->lock_released,
                        &thelock->mut, &abs);
                    if (status == 1) {
                        break;
                    }
                    if (status == ETIMEDOUT)
                        break;
                    CHECK_STATUS_PTHREAD("pthread_cond_timedwait");
                }
                else {
                    status = pthread_cond_wait(
                        &thelock->lock_released,
                        &thelock->mut);
                    CHECK_STATUS_PTHREAD("pthread_cond_wait");
                }

                if (intr_flag && status == 0 && thelock->locked) {
                    /* We were woken up, but didn't get the lock.  We probably received
                     * a signal.  Return PY_LOCK_INTR to allow the caller to handle
                     * it and retry.  */
                    success = PY_LOCK_INTR;
                    break;
                }
                else if (status == 0 && !thelock->locked) {
                    success = PY_LOCK_ACQUIRED;
                }
            }
        }
        if (success == PY_LOCK_ACQUIRED) thelock->locked = 1;
        status = pthread_mutex_unlock( &thelock->mut );
        CHECK_STATUS_PTHREAD("pthread_mutex_unlock[1]");
    }

    if (error) success = PY_LOCK_FAILURE;
    dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) -> %d\n",
             lock, microseconds, intr_flag, success));
    return success;
}