Python内核阅读(二十): 多线程机制(下)

Python 2017-09-01

起步

当创建的线程进入解释器时, 其调度就交给了python级的线程调度.

标准调度

当主线程和子线程进入解释器后, python的线程调度机制就可以在它们之间进行切换. python的线程调度机制是内建在函数 PyEval_EvalFrameEx 之中的.

[ceval.c]
PyObject * PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
    PyThreadState *tstate = PyThreadState_GET();
    return tstate->interp->eval_frame(f, throwflag);
}

这边的 tstate->interp->eval_frame 就是 _PyEval_EvalFrameDefault :

[ceval.c]
/* This single variable consolidates all requests to break out of the fast path
   in the eval loop. */
static _Py_atomic_int eval_breaker = {0};

PyObject* _Py_HOT_FUNCTION _PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
    ...
    why = WHY_NOT;
    for (;;) {
        ...
        if (_Py_atomic_load_relaxed(&gil_drop_request)) {
            // 撤销当前线程状态对象, 释放GIL, 给别的线程一个机会
            if (PyThreadState_Swap(NULL) != tstate)
                Py_FatalError("ceval: tstate mix-up");
            drop_gil(tstate);   // 释放GIL
            // 别的线程已经在执行了, 需要重新申请GIL, 等待下一次被调度.
            take_gil(tstate);
            /* Check if we should make a quick exit. */
            if (_Py_Finalizing && _Py_Finalizing != tstate) {
                drop_gil(tstate);
                PyThread_exit_thread();
            }
            if (PyThreadState_Swap(tstate) != NULL)
                Py_FatalError("ceval: orphan tstate");
        }
        ...
    fast_next_opcode:
        ...
    }
}

当达到某个条件时, GIL会被释放 drop_gil(tstate); 而这释放, 就可能立马被其他线程获得, 主线程将自身挂起. 需要等待其他线程释放GIL, 因此要重新申请GIL的使用权 take_gil(tstate); .

阻塞调度

python需要另一种触发线程调度方式, 就是阻塞调度, 它的基本思想是: 在线程A通过某种操作, 比如等待输入, 将自身阻塞后, python应该将等待GIL的线程B唤醒.

例如, 在主线程和子线程中都有 time.sleep(1) 的调用. 假如子线程调用的 time.sleep(1) , 那么子线程将释放GIL, 挂起自身, python唤醒主线程. 同样在主线程中sleep时唤醒子线程. 因此有这样一个场景, 就是程序有时候希望讲自己挂起.

这类阻塞也包括 raw_input 等待用户输入, 但在此我们仅通过sleep操作, 看看python是如何实现阻塞调度的.

[timemodule.c]
static PyObject * time_sleep(PyObject *self, PyObject *obj)
{
    _PyTime_t secs;
    if (_PyTime_FromSecondsObject(&secs, obj, _PyTime_ROUND_CEILING))
        return NULL;
    if (secs < 0) {
        PyErr_SetString(PyExc_ValueError,
                        "sleep length must be non-negative");
        return NULL;
    }
    if (pysleep(secs) != 0)
        return NULL;
    Py_RETURN_NONE;
}

_PyTime_t 结构是 int64_t 的别名.

[timemodule.c]
static int pysleep(_PyTime_t secs)
{
    _PyTime_t deadline, monotonic;
    struct timeval timeout;
    int err = 0;
    deadline = _PyTime_GetMonotonicClock() + secs;

    do {
        if (_PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) < 0)
            return -1;

        Py_BEGIN_ALLOW_THREADS
        err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
        Py_END_ALLOW_THREADS

        if (err == 0)
            break;

        if (errno != EINTR) {
            PyErr_SetFromErrno(PyExc_OSError);
            return -1;
        }

        /* sleep was interrupted by SIGINT */
        if (PyErr_CheckSignals())
            return -1;

        monotonic = _PyTime_GetMonotonicClock();
        secs = deadline - monotonic;
        if (secs < 0)
            break;
        /* retry with the recomputed delay */
    } while (1);

    return 0;
}

刚开始我也以为是使用c语言库中 sleep(n) 来完成休眠, 实际上却是用 select() 来. 两者不同的地方是 sleep 是阻塞方式, 而 select 是可以非阻塞方式. 阻塞方式是要等待事件的发生; 而非阻塞方式则是一旦执行肯定返回, 当事件发生时, 去则色方式相同, 当事件未发生, 会返回一个代码告知. 在linux平台上, python用select实现了 time.sleep(n) 的阻塞方式.

struct timeval 是一个比较常用的结构,用来代表时间值,有两个成员,一个是秒数,另一个是毫秒数. 这个结构体在 _PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) 函数里被赋值. 这里的 _PyTime_ROUND_CEILING 值为1. 如果是 time.sleep(1) 那么参数 secs = 1 * 10^9 它会转化为以纳秒为单位. 经过 _PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) 设置, timeout的结构为:

// 假设是 time.sleep(1) 的情况
timeout.tv_sec = 1
timeout.tv_usec = 0

利用这样一个结构体调用 select 函数.

select 函数说明

函数原型为:

int select(int maxfdp, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval*timeout); 

有三个参数都是 df_set 类型的. struct fd_set 可以理解为一个集合,这个集合中存放的是文件描述符(filedescriptor),即文件句柄. 它可以是任意文件, 设备,管道,FIFO等. 虽然我们在使用select函数时大都传了 0, 但还是需要看看各个参数都代表了什么.

  1. int maxfdp 是一个整数值,是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1,不能错!在Windows中这个参数的值无所谓,可以设置不正确。
  2. fd_set * readfds 是指向fd_set结构的指针, 监视这些文件描述符的读变化的,即我们关心是否可以从这些文件中读取数据了. 如果在timeout时间内没有可读文件, 会返回0, 如哦发生错误返回负值. 可以传入NULL值,表示不关心任何文件的读变化。
  3. fd_set * writefds 是指向fd_set结构的指针, 监控文件是否可写, 与上一个参数差不多. 可以传入NULL值,表示不关心任何文件的写变化。
  4. fd_set * errorfds 同上面两个参数的意图,用来监视文件错误异常。
  5. struct timeval * timeout 是select的超时时间,这个参数至关重要,它可以使select处于三种状态:
    • 第一,若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;
    • 第二,若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;
    • 第三,timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。

返回值

  • 负值:select错误
  • 正值:某些文件可读写或出错
  • 0:等待超时,没有可读写或错误的文件

综上, select是既可以阻塞, 也可以是非阻塞, 当timeout设为0时, 它是一个非阻塞, 当timeout大于0, 会在timeout时间内阻塞.

select 函数的调用前, 需要将线程状态对象进行保存, 释放GIL以便进行线程调度. 源码中可以看到调用的select函数被两个宏包裹:

Py_BEGIN_ALLOW_THREADS
err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
Py_END_ALLOW_THREADS

将宏展开:

[ceval.h]
#define Py_BEGIN_ALLOW_THREADS { \
                        PyThreadState *_save; \
                        _save = PyEval_SaveThread();
#define Py_BLOCK_THREADS        PyEval_RestoreThread(_save);
#define Py_UNBLOCK_THREADS      _save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS    PyEval_RestoreThread(_save); \
                 }

[ceval.c]
PyThreadState * PyEval_SaveThread(void)
{
    PyThreadState *tstate = PyThreadState_Swap(NULL);
    if (tstate == NULL)
        Py_FatalError("PyEval_SaveThread: NULL tstate");
    if (gil_created())
        drop_gil(tstate);
    return tstate;
}

void PyEval_RestoreThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_RestoreThread: NULL tstate");
    if (gil_created()) {
        int err = errno;
        take_gil(tstate);
        /* _Py_Finalizing is protected by the GIL */
        if (_Py_Finalizing && tstate != _Py_Finalizing) {
            drop_gil(tstate);
            PyThread_exit_thread();
            assert(0);  /* unreachable */
        }
        errno = err;
    }
    PyThreadState_Swap(tstate);
}

两个宏中, Py_BEGIN_ALLOW_THREADS 是保存线程状态对象, 释放GIL. Py_END_ALLOW_THREADS 是重新申请GIL.

这就是一个线程主动放弃GIL的例子, 而不是靠python强制线程挂起进行调度. 只要在 Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS 之间不调用python的c api, 我们可以认为他是线程安全的. 因为此期间只要能保护共享资源. 我们就能说它是线程安全的.

python 子线程的销毁

在子线程完成了改完成的python代码后, Python就该对该线程进行销毁. Python里, 销毁主线程和销毁子线程有所不同, 主线程的销毁会包含销毁Python的运行时环境, 而子线程的销毁不需要.

子线程的销毁在函数 t_bootstrap 中, 而线程对象的创建是在构造 bootstate 结构体时创建 PyThreadState 的.

[_threadmodule.c]
static void t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    PyEval_AcquireThread(tstate);
    nb_threads++;
    res = PyObject_Call(boot->func, boot->args, boot->keyw);
    if (res == NULL) {
        ...
    }
    else
        Py_DECREF(res);
    Py_DECREF(boot->func);
    Py_DECREF(boot->args);
    Py_XDECREF(boot->keyw);
    PyMem_DEL(boot_raw);
    nb_threads--;
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}

python首先会通过 PyThreadState_Clear(tstate); 清理当前线程多对应的线程状态对象. 就是对线程对象中维护的东西进行引用计数的维护. 随后, 就该释放GIL了, 释放GIL的操作在 PyThreadState_DeleteCurrent() 中:

[pystate.c]
void PyThreadState_DeleteCurrent()
{
    PyThreadState *tstate = GET_TSTATE();
    tstate_delete_common(tstate);
    if (autoInterpreterState && PyThread_get_key_value(autoTLSkey) == tstate)
        PyThread_delete_key_value(autoTLSkey);
    SET_TSTATE(NULL);
    PyEval_ReleaseLock();
}

删除了当前的线程状态对象, 然后通过 PyEval_ReleaseLock() 释放GIL.

python线程用户级的互斥与同步

python线程在GIL的控制下, 限制对python提供的C API, 对这些API来说, 它们之间是互斥的. 但这种互斥用户不能控制, python显然还需要更高层次的互斥机制 -- 用户级互斥.

import threading
import time

lock = threading._allocate_lock()

def threadProc():
    while True:
        print("sub thread ", threading.get_ident())
        print("sub thread wait lock", threading.get_ident())
        lock.acquire()
        print("sub thread get lock", threading.get_ident())
        time.sleep(3)
        print("sub thread release lock", threading.get_ident())
        lock.release()
        time.sleep(1)

print("main thread ", threading.get_ident())
threading._start_new_thread(threadProc, ())
while True:
    print("main thread ", threading.get_ident())
    print("main thread wait lock", threading.get_ident())
    lock.acquire()
    print("main thread get lock", threading.get_ident())
    print("main thread release lock", threading.get_ident())
    lock.release()
    time.sleep(1)

当线程通过 lock.acquire() 获得lock之后, 独享执行代码的权利, 其他进程则要等它释放lock之后才会被python的线程调度唤醒. 这种机制给了用户控制线程之间的交互的能力, 是python中实现线程互斥和同步的中心.

Lock 对象

上面代码中, 我们通过 threading._allocate_lock() 创建了一个lock对象, 所以看看对应C函数 thread_PyThread_allocate_lock :

[_threadmodule.c]
static PyObject * thread_PyThread_allocate_lock(PyObject *self)
{
    return (PyObject *) newlockobject();
}

static lockobject * newlockobject(void)
{
    lockobject *self;
    self = PyObject_New(lockobject, &Locktype);
    self->lock_lock = PyThread_allocate_lock();
    self->locked = 0;
    self->in_weakreflist = NULL;
    return self;
}

threading._allocate_lock() 仅仅只是创建了一个 lockobject 对象, 实际上, python的用户级线程同步机制都是在这个对象的基础上:

[pythread.h]
typedef void *PyThread_type_lock;

[_threadmodule.c]
typedef struct {
    PyObject_HEAD
    PyThread_type_lock lock_lock;
    PyObject *in_weakreflist;
    char locked; /* for sanity checking */
} lockobject;

显然 lockobject 也是一个PyObject对象, 这也是python的内建对象, 也有自己的方法:

[_threadmodule.c]
static PyMethodDef lock_methods[] = {
    {"acquire_lock", (PyCFunction)lock_PyThread_acquire_lock, ...},
    {"acquire",      (PyCFunction)lock_PyThread_acquire_lock, ...},
    {"release_lock", (PyCFunction)lock_PyThread_release_lock, ...},
    {"release",      (PyCFunction)lock_PyThread_release_lock, ...},
    {"locked_lock",  (PyCFunction)lock_locked_lock,           ...},
    {"locked",       (PyCFunction)lock_locked_lock,           ...},
    {"__enter__",    (PyCFunction)lock_PyThread_acquire_lock, ...},
    {"__exit__",    (PyCFunction)lock_PyThread_release_lock,  ...},
    {NULL,           NULL}              /* sentinel */
};

很多属性只是别名, 应该又是兼容旧版本吧. 总的操作也就三种: 加锁, 解锁, 判断是否被锁.

加锁

一个python在内核级访问python解释器之前, 都要先申请GIL, 同样的, 在用户级需要访问共享资源之前也要申请用户级lock, 这个申请动作在 lock.acquire() 中完成, 对应的C函数是 lock_PyThread_acquire_lock :

[_threadmodule.c]
static PyObject * lock_PyThread_acquire_lock(lockobject *self, PyObject *args, PyObject *kwds)
{
    _PyTime_t timeout;
    PyLockStatus r;

    // 设置等待时间 没设置时, timeout是个负数
    if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
        return NULL;

    r = acquire_timed(self->lock_lock, timeout);
    if (r == PY_LOCK_INTR) {
        return NULL;
    }

    if (r == PY_LOCK_ACQUIRED)
        self->locked = 1;
    return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}

lock对象是通过信号量 sem_t 进行线程间同步的, 通过 sem_wait C库函数进行等待. 自然, 可以想到它的release操作:

[_threadmodule.c]
static PyObject *
lock_PyThread_release_lock(lockobject *self)
{
    /* Sanity check: the lock must be locked */
    if (!self->locked) {
        PyErr_SetString(ThreadError, "release unlocked lock");
        return NULL;
    }

    PyThread_release_lock(self->lock_lock);
    self->locked = 0;
    Py_RETURN_NONE;
}

[thread_pthread.h]
void PyThread_release_lock(PyThread_type_lock lock)
{
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;
    (void) error;
    status = sem_post(thelock);
    CHECK_STATUS("sem_post");
}

高级线程标准库 threading

python的内建模块 _thread 提供了比较低级的线程控制工具, 为了简化多线程应用的开发, python在_thread的基础上构建了一个高级的线程控制库-- threading . 这个标准库也是最常见的多线程编程用到的库.

这个标准库的也有直接使用 _thread 的属性:

[threading.py]
import _thread
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
ThreadError = _thread.error

threading.py 中, 有一套记录所有通过继承 threading.Thread 而创建的python线程机制. 这个机制通过两个dict和一个lock完成:

[threading.py]
# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {}    # maps thread id to Thread object
_limbo = {}

我们通过 threading.Thread 创建多线程, 有两个阶段, 第一个阶段是调用 threading.Thread.start , 而第二个阶段是在 threading.Thread.start 中调用run. 当处于第一阶段时, 还没有调用 _thread.start_new_thread 创建原生子线程, 这时候记录到 _limbo 中, 因为没有创建子线程, 所以没有子线程id, 记录的方式是 _limbo[self] = self . 第二阶段, 调用 _thread.start_new_thread 创建了原生子线程, 这是从 _limbo 中删除子线程,

with _active_limbo_lock:
    _active[self._ident] = self
    del _limbo[self]

_limbo 用来保存第一阶段, 就是还待创建子线程的Thread, _active 用来保存已经创建子线程的. 通过 enumerate() 可以用来访问他们两个的集合:

[threading.py]
def enumerate():
    """Return a list of all Thread objects currently alive.

    The list includes daemonic threads, dummy thread objects created by
    current_thread(), and the main thread. It excludes terminated threads and
    threads that have not yet been started.

    """
    with _active_limbo_lock:
        return list(_active.values()) + list(_limbo.values())

threading 的线程同步工具

threading 中提供了不同用于线程同步的工具, 以简化python用户实现多线程应用程序. 这些同步工具实际上都是建立在 _thread 的lock对象的基础上的. 其中一个是直接用 _thread的lock对象的:

[threading.py]
_allocate_lock = _thread.allocate_lock
Lock = _allocate_lock

在这个对象上和前面所描述的, 可以进行 acquire, release 等操作.

下面我们将介绍一些同步工具 -- RLock, Condition .

RLock

RLock对象是lock对象的一个变种, 其内部维护着一个lock对象, 但是它是一种可以重入lock. 一般而言, 对于lock对象要想连续进行两次 acquire 操作. 第二次acquire必须等待第一次加锁有release. 如果第一次加锁后没有release. 那么这个程序就会因此造成线程死锁. RLock对象则允许同一个线程多次对其进行acquire操作, 因为在其内部通过一个 counter 变量维护着线程 acquire 的次数. 而且每一次的 acquire 操作必须有一个 release 操作与之对应, 在所有的 release 操作都完成之后, 别的线程才能申请该RLock对象.

Condition

Condition 对象也是对lock对象的包装, 在创建 Condition 对象时, 其构造函数需要一个Lock对象作为参数, 如果没有则将自行创建一个RLock对象. 在 Condition 对象上, 也可以调用 acquirerelease 操作. 同时 Condition 还提供 waitnotify 的操作. 这两个操作有什么用呢.

假设有Condition对象C. 当线程A调用 C.wait() 时, 线程A将释放C中的lock对象, 并进入阻塞状态, 知道有别的程序调用 C.notify() , A 才会重新通过 acquire 重新申请C中的lock对象, 并退出 wait 操作.

这种的应用场景就是两个线程, 其中A线程要做的必须先让B线程完成某件事A才能继续执行, 这样的情况下就可以用 Condition 来解决.

Semaphore

Semaphore 对象内部维护这一个 Condition 对象, 对于管理一组共享资源非常有用. lock对象可以保护一个共享资源, 但是假如我们有一个共享资源池, 其中有5个共享资源A, 这意味着可以用5个线程同时自由地访问这些资源, 而如果使用Lock来对共享资源进行保护的话, 所有线程都是互斥的, 这使得会有4个人资源被浪费. Semaphore 正是来解决这样的问题. 它在 Condition 的基础上实现了对共享资源池的线程同步机制. Semaphore 提供了两个操作 acquirerelease , 都与Lock有相同的语义. 当线程调用 Semaphore.acquire 时, 如果共享资源中还有剩余的A时, 线程就会继续执行; 而如果资源池中已经没有可以用资源, 线程就会将自身挂起, 知道别的线程调用 Semaphore.release 释放一个资源.

Event

Event 也是对 Condition 的一个简单包装, 提供了独有的 set 和 wait 语义. 用于主线程控制其他线程的执行. 内部维护一个 flag 标记. 像红绿灯一样, 当标记设为 True 时, 会通知所有等待线程恢复运行.

threading 中的 Thread

我们经常用到 threading 中的一个重要组件 threading.Thread . 在它的实现中, 用到了我们前面说的许多机制. 特别的, 它实现 join 操作, 等待某个thread运行完后再继续执行的功能. 更加细节的东西, 请查看 Lib/threading.py .


本文由 hongweipeng 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

赏个馒头吧