21、Python3 多线程

作者: Brinnatt 分类: python 术 发布时间: 2023-03-30 10:33

21.1、并发

21.1.1、并发和并行区别

并行,parallel:同时做某些事,可以互不干扰的同一个时刻做几件事。

并发,concurrency:也是同时做某些事,但是强调,一个时段内有事情要处理。

21.1.2、并发的解决

食堂打饭模型:中午 12 点,开饭啦,大家都涌向食堂,这就是并发。如果人很多,就是高并发。

21.1.2.1、队列、缓冲区

假设只有一个窗口,陆续涌入食堂的人,排队打菜是比较好的方式。

所以,排队(队列)是一种天然解决并发的办法。排队就是把人排成队列,先进先出,解决了资源使用的问题。排成的队列,其实就是一个缓冲地带,就是缓冲区

假设女生优先,那么这个窗口就得是两队,只要有女生来就可以先打饭,男生队列等着,女生队伍就是一个优先队列

例如 queue 模块的类 Queue、LifoQueue、PriorityQueue。

21.1.2.2、争抢

只开一个窗口,有可能没有秩序,也就是谁挤进去就给谁打饭。挤到窗口的人占据窗口,直到打到饭菜离开。其他人继续争抢,会有一个人占据着窗口,可以视为锁定窗口,窗口就不能为其他人提供服务了。这是一种锁机制

谁抢到资源就上锁,排他性的锁,其他人只能等候。争抢也是一种高并发解决方案,但是,这样不好,因为有可能有人很长时间抢不到。

21.1.2.3、预处理

如果排长队的原因,是由于每个人打菜等候时间长,因为要吃的菜没有,需要现做,没打着饭不走开,锁定着窗口。

食堂可以提前统计大多数人最爱吃的菜品,将最爱吃的 80% 的热门菜,提前做好,保证供应 20% 的冷门菜,现做。

这样大多数人,就算锁定窗口,也很快就释放窗口了。

一种提前加载用户需要的数据的思路,预处理思想,缓存常用。

21.1.2.4、并行

成百上千人同时来吃饭,一个队伍搞不定的,多开打饭窗口形成多个队列,如同开多个车道一样,并行打菜。

开窗口就得扩大食堂,得多雇人在每一个窗口提供服务,造成成本上升。

日常可以通过购买更多服务器,或多开进程、线程实现并行处理,来解决并发问题。注意这些都是水平扩展思想。

注:如果线程在单 CPU 上处理,就不是并行了。但是多数服务器都是多 CPU 的,服务的部署往往是多机的、分布式的,这都是并行处理。

21.1.2.5、提速

提高单个窗口的打饭速度,也是解决并发的方式。打饭人员提高工作技能,或为单个窗口配备更多的服务人员,都是提速的办法。

提高单个 CPU 性能,或单个服务器安装更多的 CPU。这是一种 垂直扩展 思想。

21.1.2.6、消息中间件

上地、西二旗地铁 站外的九曲回肠的走廊,缓冲人流,进去之后再多口安检进站。

常见的消息中间件有 RabbitMQ、ActiveMQ ( Apache )、RocketMQ(阿里Apache)、kafka (Apache) 等。

当然还有其他手段解决并发问题,但是已经列举了最常用的解决方案,一般来说不同的并发场景用不同的策略,而策略可能是多种方式的优化组合。

21.2、进程和线程

在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个程序的执行实例就是一个进程。

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

进程和程序的关系:

程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源),它也是线程的容器。

Linux 进程有父进程、子进程;Windows 的进程是平等关系。

线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。

一个标准的线程由线程 ID,当前指令指针(PC),寄存器集合和堆栈组成。

在许多系统中,创建一个线程比创建一个进程快10-100 倍。

进程、线程的理解:

现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。

进程就是独立的王国,进程间不可以随便的共享数据。

线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。

21.2.1、线程的状态

状态 含义
就绪(Ready) 线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占。
运行(Running) 线程正在运行。
阻塞(Blocked) 线程等待外部事件发生而无法运行,如I/O操作。
终止(Terminated) 线程完成,或退出,或被取消。

21.2.2、Python 中的进程和线程

进程会启动一个解释器进程,线程共享一个解释器进程。

21.3、Python 线程开发

Python 的线程开发使用标准库 threading。

21.3.1、Thread 类

# 签名
def __init__(self, group=None, target=None, name=None, args=(),kwargs=None, *, daemon=None)
参数名 含义
target 线程调用的对象,就是目标函数
name 为线程起个名字
args 为目标函数传递实参,元组
kwargs 为目标函数关键字传参,字典

21.3.2、线程启动

import threading

# 待线程调用对象
def worker():
    print("I'm working")
    print('finished')

t = threading.Thread(target=worker, name='worker')  # 线程对象
t.start()  # 启动

通过 threading.Thread 创建一个线程对象,target 是目标函数,name 可以指定名称。但是线程没有启动,需要调用 start 方法。

线程之所以执行函数,是因为线程中就是执行代码的,而最简单的封装就是函数,所以还是函数调用。

函数执行完,线程也就退出了。那么,如果不让线程退出,或者让线程一直工作怎么办呢?

import threading
import time

# 待线程调用对象
def worker():
    while True:
        time.sleep(1)
        print("I'm working")

t = threading.Thread(target=worker, name='worker')  # 线程对象
t.start()  # 启动

21.3.3、线程退出

Python 没有提供线程退出的方法,线程在下面情况时退出:

1、线程函数内语句执行完毕。
2、线程函数中抛出未处理的异常。

import threading
import time

# 待线程调用对象
def worker():
    count = 0
    while True:
        if count > 5:
            break
        time.sleep(1)
        print("I'm working")
        count += 1

t = threading.Thread(target=worker, name='worker')  # 线程对象
t.start()  # 启动

print("=== End ===")

Python 的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了。

21.3.4、线程的参数

import threading
import time

# 待线程调用对象
def add(x, y):
    print('{} + {} = {}'.format(x, y, x + y, threading.current_thread().ident))

thread1 = threading.Thread(target=add, name='add', args=(4, 5))  # 线程对象
thread1.start()  # 启动
time.sleep(2)

thread2 = threading.Thread(target=add, name='add', args=(5,), kwargs={'y': 4})  # 线程对象
thread2.start()  # 启动
time.sleep(2)

thread3 = threading.Thread(target=add, name='add', kwargs={'x': 4, 'y': 5})  # 线程对象
thread3.start()  # 启动

线程传参和函数传参没什么区别,本质上就是函数传参。

21.3.5、threading 的属性和方法

名称 含义
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处于alive状态的线程个数
enumerate() 返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程
get_ident() 返回当前线程的ID,非0整数

active_count、enumerate 方法返回的值还包括主线程。

import threading
import time

def showthreadinfo():
    print('currentthread = {}'.format(threading.current_thread()))
    print('main thread = {}'.format(threading.main_thread()))
    print('active count= {}'.format(threading.active_count()))

def worker():
    count = 0
    showthreadinfo()

    while True:
        if count > 5:
            break
        time.sleep(1)
        count += 1
        print("I'm working")

t = threading.Thread(target=worker, name='worker')  # 线程对象
showthreadinfo()
t.start()  # 启动

print("===End===")

21.3.6、Thread 实例的属性和方法

名称 含义
name 只是一个名字,只是个标识,名称可以重名。getName()、setName()获取、设置这个名词。
ident 线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可以访问。此ID可以重复使用。
is_alive() 返回线程是否活着。

注意:线程的 name 这是一个名称,可以重复;ID 必须唯一,但可以在线程退出后再利用。

import threading
import time

def worker():
    count = 0

    while True:
        if count > 5:
            break
        time.sleep(1)
        count += 1
        print(1, 'in thread -->', threading.current_thread().name)

t = threading.Thread(target=worker, name='worker')  # 线程对象
print(t.ident)
t.start()  # 启动

while True:
    time.sleep(1)
    if t.is_alive():
        print(2, 'in main -->', '{} {} alive'.format(t.name, t.ident))
    else:
        print(2.1, 'in main -->', '{} {} dead'.format(t.name, t.ident))
        # t.start()  # RuntimeError: threads can only be started once
名称 含义
start() 启动线程。每一个线程必须且只能执行该方法一次。
run() 运行线程函数。

为了演示,派生一个 Thread 的子类。

21.3.6.1、start 方法

import threading
import time

def worker():
    count = 0

    while True:
        if count > 5:
            break
        time.sleep(1)
        count += 1
        print(1, 'in thread -->', 'worker running')

class MyThread(threading.Thread):
    def start(self):
        print(2, 'in son Thread -->', 'start ...')
        super().start()

    def run(self):
        print(3, 'in son Thread -->', 'run ...')
        super().run()

t = MyThread(name='worker', target=worker)
t.start()

运行结果:
2 in son Thread --> start ...
3 in son Thread --> run ...
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running

21.3.6.2、run 方法

import threading
import time

def worker():
    count = 0

    while True:
        if count > 5:
            break
        time.sleep(1)
        count += 1
        print(1, 'in thread -->', 'worker running')

class MyThread(threading.Thread):
    def start(self):
        print(2, 'in son Thread -->', 'start ...')
        super().start()

    def run(self):
        print(3, 'in son Thread -->', 'run ...')
        super().run()

t = MyThread(name='worker', target=worker)
# t.start()
t.run()

运行结果:
3 in son Thread --> run ...
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running

start() 方法会调用 run() 方法,而 run() 方法可以运行函数。

这两个方法看似功能重复了,这么看来留一个方法就可以了。是这样吗?

21.3.6.3、start 和 run 的区别

在线程函数中,增加打印线程的名字的语句,看看能看到什么信息。

import threading
import time

def worker():
    count = 0

    while True:
        if count > 5:
            break
        time.sleep(1)
        count += 1
        print(1, 'in thread -->', 'worker running')
        print(1.1, 'in thread -->', threading.current_thread().name)

class MyThread(threading.Thread):
    def start(self):
        print(2, 'in son Thread -->', 'start ...')
        super().start()

    def run(self):
        print(3, 'in son Thread -->', 'run ...')
        super().run()

t = MyThread(name='worker', target=worker)
# t.start()
t.run()  # 分别执行start或者run方法

使用 start 方法启动线程,启动了一个新的线程,名字叫做 worker。但是使用 run 方法,并没有启动新的线程,就是在主线程中调用了一个普通的函数而已。

因此,启动线程请使用 start 方法,才能启动多个线程。

21.3.7、多线程

顾名思义,多个线程,一个进程中如果有多个线程,就是多线程,实现一种并发。

import threading
import time

def worker():
    count = 0

    while True:
        if count > 5:
            break
        time.sleep(0.5)
        count += 1
        print(1, 'in thread -->', 'worker running')
        print(1.1, 'in thread -->', threading.current_thread().name, threading.current_thread().ident)

class MyThread(threading.Thread):
    def start(self):
        print(2, 'in son Thread -->', 'start ...')
        super().start()

    def run(self):
        print(3, 'in son Thread -->', 'run ...')
        super().run()

t1 = MyThread(name='工人1', target=worker)
t2 = MyThread(name='工人2', target=worker)
# t1.start()
# t2.start()
t1.run()
t2.run()

没有开启新的线程,这就是普通函数调用,所以执行完 t1.run(),然后执行 t2.run(),这里就不是多线程。

当使用 start 方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程。

一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。一个进程至少有一个主线程。

其他线程称为工作线程。

21.3.8、线程安全

IPython 中演示,python 命令行、pycharm 都不能演示出效果。

import threading

def worker():
    for _ in range(100):
        print("{} is running.".format(threading.current_thread().name))

for x in range(1, 5):
    name = "worker {}".format(x)
    t = threading.Thread(name=name, target=worker)
    t.start()
输出:
......
worker 4 is running.
worker 2 is running.worker 1 is running.
worker 1 is running.
worker 1 is running.worker 4 is running.
worker 4 is running.
......

看代码,应该是一行行打印,但是很多字符串打在了一起,为什么?

说明 print 函数被打断了,被线程切换打断了。print 函数分两步,第一步打印字符串,第二步换行,就在这之间,发生了线程的切换。

这说明 print 函数是线程不安全的。

线程安全:线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的。

上例中,本以为 print 应该是打印文本之后紧跟着一个换行的,但是有时候确实好几个文本在一起,后面跟上换行,而且发生这种情况的时机不确定。

所以,print 函数不是线程安全函数。如果是这样,多线程编程的时候,print 输出日志,不能保证一个输出后面一定立即换行了,怎么办?

21.3.8.1、不让print打印换行

import threading

def worker():
    for _ in range(100):
        print("{} is running.\n".format(threading.current_thread().name), end='')

for x in range(1, 5):
    name = "worker {}".format(x)
    t = threading.Thread(name=name, target=worker)
    t.start()

字符串是不可变的类型,它可以作为一个整体不可分割输出。end='' 就不再让 print 输出换行了。

21.3.8.2、使用logging

标准库里面的 logging 模块,日志处理模块,属于线程安全模块,生产环境代码都使用 logging。

import logging
import threading

def worker():
    for _ in range(100):
        # print("{} is running.\n".format(threading.current_thread().name), end='')
        logging.warning("{} is running.".format(threading.current_thread().name))

for x in range(1, 5):
    name = "worker {}".format(x)
    t = threading.Thread(name=name, target=worker)
    t.start()

21.3.9、daemon 和 non-daemon 线程

注意:这里的 daemon 不是 Linux 中的守护进程。

进程靠线程执行代码,至少有一个主线程,其它线程是工作线程。

主线程是第一个启动的线程。

父线程:如果线程 A 中启动了一个线程 B,A 就是 B 的父线程。

子线程:B 就是 A 的子线程。

Python 中,构造线程的时候,可以设置 daemon 属性,这个属性必须在 start 方法前设置好。

# 源码Thread的__init__方法中
if daemon is not None:
    self._daemonic = daemon
else:
    self._daemonic = current_thread().daemon
self._ident = None

线程 daemon 属性,如果设定就是用户的设置,否则就取当前线程的 daemon 值。

主线程是 non-daemon 线程,即 daemon = False。

import time
import threading

def foo():
    time.sleep(5)
    for i in range(20):
        print(i)

# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=False)
t.start()
print('Main Thread Exiting')

发现线程 t 依然执行,主线程已经执行完,但是一直等着线程 t。

修改为 t = threading.Thread(target=foo,daemon=True) 试一试。程序立即结束了,根本没有等线程 t。

名称 含义
daemon属性 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常
isDaemon() 是否是daemon线程
setDaemon 设置为daemon线程,必须在start方法之前设置

总结:

线程具有一个 daemon 属性,可以显示设置为 True 或 False,也可以不设置,则取默认值 None。

如果不设置 daemon,就取当前线程的 daemon 来设置它。主线程是 non-daemon 线程,即 daemon = False。

从主线程创建的所有线程不设置 daemon 属性,则默认都是 daemon = False,也就是 non-daemon 线程。

Python 程序在没有活着的 non-daemon 线程运行时随主线程退出,也就是剩下的只能是 daemon 线程,主线程才能退出,否则主线程就只能等待。

思考下面程序的输出是什么?

import time
import threading

def bar():
    time.sleep(10)
    print('bar')

def foo():
    for i in range(20):
        print(i)
    t = threading.Thread(target=bar, daemon=False)
    t.start()

# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=True)
t.start()
print('Main Thread Exiting')

上例中,会不会输出 bar 这个字符串,如果没有,如何修改才能打印出来?

time.sleep(2)
print('Main Thread Exiting')

再看一个例子:

import time
import threading

def foo(n):
    for i in range(n):
        print(i)
        time.sleep(1)

# 主线程是non-daemon线程
t1 = threading.Thread(target=foo, args=(20,), daemon=True)  # 调换10和20看看效果

t1.start()

t2 = threading.Thread(target=foo, args=(10,), daemon=False)
t2.start()

time.sleep(2)
print('Main Thread Exiting')

上例说明,如果有 non-daemon 线程的时候,主线程退出时,也不会杀掉所有 daemon 线程,直到所有 non-daemon 线程全部结束;如果还有 daemon 线程,主线程需要退出,会结束所有 daemon 线程,退出。

21.3.10、join 方法

先看一个简单的例子,看看效果:

import time
import threading

def foo(n):
    for i in range(n):
        print(i)
        time.sleep(1)

t1 = threading.Thread(target=foo, args=(10,), daemon=True)
t1.start()

t1.join()  # 设置join,取消join对比一下
print('Main Thread Exiting')

使用了 join 方法后,daemon 线程执行完了,主线程才退出了。

join(timeout=None),是线程的标准方法之一。

一个线程中调用另一个线程的 join 方法,调用者将被阻塞,直到被调用线程终止。

一个线程可以被 join 多次。

timeout 参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束。

调用谁的 join 方法,就是 join 谁,就要等谁。

21.3.11、daemon 线程应用场景

简单来说就是,本来并没有 daemon thread,为了简化程序员的工作,让他们不用去记录和管理那些后台线程,创造了一个 daemon thread 的概念。这个概念唯一的作用就是,当你把一个线程设置为 daemon,它会随主线程的退出而退出。

主要应用场景有:

1、后台任务。如发送心跳包、监控,这种场景最多。

2、主线程工作才有用的线程。如主线程中维护着公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作也没有意义了,一起退出最合适。

3、随时可以被终止的线程。

如果主线程退出,想所有其它工作线程一起退出,就使用 daemon=True 来创建工作线程。

比如,开启一个线程定时判断 WEB 服务是否正常工作,主线程退出,工作线程也没有必须存在了,应该随着主线程退出一起退出。这种 daemon 线程一旦创建,就可以忘记它了,只需关心主线程什么时候退出就行了。

daemon 线程,简化了程序员手动关闭线程的工作。

如果在 non-daemon 线程 A 中,对另一个 daemon 线程 B 使用了 join 方法,这个线程 B 设置成 daemon 就没有什么意义了,因为 non-daemon 线程 A 总是要等待 B。

如果在一个 daemon 线程 C 中,对另一个 daemon 线程 D 使用了 join 方法,只能说明 C 要等待 D,主线程退出,C 和 D 不管是否结束,也不管它们谁等谁,都要被杀掉。

举例:

import time
import threading

def bar():
    while True:
        time.sleep(1)
        print('bar')

def foo():
    print("t1's daemon = {}".format(threading.current_thread().daemon))
    t2 = threading.Thread(target=bar)
    t2.start()
    print("t2's daemon = {}".format(t2.daemon))

t1 = threading.Thread(target=foo, daemon=True)
t1.start()

time.sleep(3)
print("Main Thread Exiting")

上例,只要主线程退出,2 个工作线程都结束。可以使用 join,让线程结束不了,怎么做?

import time
import threading

def bar():
    while True:
        time.sleep(1)
        print('bar')

def foo():
    print("t1's daemon = {}".format(threading.current_thread().daemon))
    t2 = threading.Thread(target=bar)
    t2.start()
    print("t2's daemon = {}".format(t2.daemon))
    t2.join()

t1 = threading.Thread(target=foo, daemon=True)
t1.start()

t1.join()
print("Main Thread Exiting")

21.3.12、threading.local 类

import threading
import time

# 局部变量实现
def worker():
    x = 0
    for i in range(100):
        time.sleep(0.0001)
        x += 1

    print(threading.current_thread(), x)

for _ in range(10):
    threading.Thread(target=worker).start()

上例使用多线程,每个线程完成不同的计算任务。x 是局部变量。能否改造成使用全局变量完成。

import threading
import time

class A:
    def __init__(self):
        self.x = 0

# 全局对象
global_data = A()

def worker():
    global_data.x = 0
    for i in range(100):
        time.sleep(0.0001)
        global_data.x += 1
    print(threading.current_thread(), global_data.x)

for _ in range(10):
    threading.Thread(target=worker).start()

上例虽然使用了全局对象,但是线程之间互相干扰,导致了错误的结果。

能不能使用全局对象,还能保持每个线程使用不同的数据呢?

python 提供 threading.local 类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储的数据其他线程看不见。

import threading
import time

# 全局对象
global_data = threading.local()

def worker():
    global_data.x = 0
    for i in range(100):
        time.sleep(0.0001)
        global_data.x += 1
    print(threading.current_thread(), global_data.x)

for _ in range(10):
    threading.Thread(target=worker).start()

结果显示和使用局部变量的效果一样。再看 threading.local 的例子。

import threading

X = 'abc'
ctx = threading.local()
ctx.x = 123

print(ctx, type(ctx), ctx.x)

def worker():
    print(X)
    print(ctx)
    print(ctx.x)
    print('working')

worker()  # 普通函数调用
print('-' * 80)
threading.Thread(target=worker).start()  # 另起一个线程

从运行结果来看,另起一个线程打印 ctx.x 出错了。

AttributeError: '_thread._local' object has no attribute 'x'

但是,ctx 打印没有出错,说明看到 ctx,但是 ctx 中的 x 看不到,这个 x 不能跨线程。

threading.local 类构建了一个大字典,其元素是每个线程实例的地址为 key 和线程对象引用线程单独的字典的映射,如下:

id(Thread) -> (ref(Thread),thread-local dict)

通过 threading.local 实例就可在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全。

21.3.13、Timer 定时器

threading.Timer 继承自 Thread,这个类用来定义多久执行一个函数。

class threading.Timer(interval, function, args=None, kwargs=None)

start 方法执行之后,Timer 对象会处于等待状态,等待了 interval 之后,开始执行 function 函数。

如果在执行函数之前的等待阶段,使用了 cancel 方法,就会跳过执行函数结束。

import threading
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def worker():
    logging.info('in worker')
    time.sleep(2)

t = threading.Timer(5, worker)
t.name = 'w1'
t.start()  # 启动线程
print(threading.enumerate())
t.cancel()  # 取消,可以注释这一句看看如何定时执行
time.sleep(1)
print(threading.enumerate())

如果线程中 worker 函数已经开始执行,cancel 就没有任何效果了。

总结:Timer 是线程 Thread 的子类,就是线程类,具有线程的能力和特征。

它的实例是能够延时执行目标函数的线程,在真正执行目标函数之前,都可以 cancel 它。

提前 cancel:

import threading
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def worker():
    logging.info('in worker')
    time.sleep(2)

t = threading.Timer(5, worker)
t.name = 'w1'
t.cancel()  # 提前取消
t.start()   # 启动线程
t.join()    # 就算join也没用
print(threading.enumerate())
print(threading.enumerate())

21.4、线程同步

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

不同操作系统实现技术有所不同,有临界区( Critical Section )、互斥量( Mutex)、信号量( Semaphore)、事件 Event 等。

21.4.1、Event

Event 事件,是线程间通信机制中最简单的实现,使用一个内部的标记 flag,通过 flag 的 True 或 False 的变化来进行操作。

名称 含义
set() 标记设置为True
clear() 标记设置为False
is_set() 标记是否为True
wait(timeout=None) 设置等待标记为True的时长,None为无限等待。等到返回True,未等到超时了返回False

需求:老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到生产了 10 个杯子。

from threading import Event, Thread
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def boss(event: Event):
    logging.info("I'm boss, waiting for you")
    # 等待
    event.wait()
    logging.info("Good Job")

def worker(event: Event, count=10):
    logging.info("I'm working for U.")
    cups = []
    while True:
        logging.info('make 1')
        time.sleep(0.5)
        cups.append(1)
        if len(cups) >= count:
            # 通知
            event.set()
            break
    logging.info("I finished my job. cups={}".format(cups))

event = Event()
w = Thread(target=worker, args=(event,))
b = Thread(target=boss, args=(event,))
w.start()
b.start()

总结:使用同一个 Event 对象的标记 flag。谁 wait 就是等到 flag 变为 True,或等到超时返回 False。不限制等待的个数。

wait 的使用:

from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def do(event: Event, interval: int):
    while not event.wait(interval):
        logging.info('do sth.')

e = Event()
Thread(target=do, args=(e, 3)).start()
e.wait(10)
e.set()
print('main exit')

Event 的 wait 优于 time.sleep,它会更快的切换到其它线程,提高并发效率。

Event 练习:实现 Timer,延时执行的线程,延时计算add(x, y)

思路:Timer 的构造函数中参数得有哪些?如何实现start启动一个线程执行函数?如何cancel取消待执行任务?

# 思路实现
from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def add(x: int, y: int):
    logging.info(x + y)

class Timer:
    def __init__(self, fn, interval, *args, **kwargs):
        pass

    def start(self):
        pass

    def cancel(self):
        pass

完整实现:

import datetime
from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def add(x: int, y: int):
    logging.info(x + y)

class Timer:
    def __init__(self, fn, interval, *args, **kwargs):
        self.fn = fn
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self.event = Event()

    def start(self):
        Thread(target=self.__run).start()

    def cancel(self):
        self.event.set()

    def __run(self):
        start = datetime.datetime.now()
        logging.info('waiting')

        self.event.wait(self.interval)
        if not self.event.is_set():
            self.fn(*self.args, **self.kwargs)
        delta = (datetime.datetime.now() - start).total_seconds()
        if not self.event.is_set():
            logging.info('You took {}s and finished'.format(delta))
            self.event.set()
        else:
            logging.info('You took {}s but not finished'.format(delta))

t = Timer(add, 10, 4, 50)
t.start()

e = Event()
e.wait(4)
# t.cancel()
print('======MainThread exit======')

21.4.2、Lock

锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。

需求:订单要求生产1000个杯子,组织10个工人生产。

import threading
from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

cups = []

def worker(count=10):
    logging.info("I'm working for U.")
    while len(cups) < count:
        time.sleep(0.0001)
        cups.append(1)
    logging.info('I finished. cups={}'.format(len(cups)))

for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

输出:
2023-03-22 11:00:31,993 Thread-1 (worker) 6352 I'm working for U.
2023-03-22 11:00:31,993 Thread-2 (worker) 9980 I'm working for U.
2023-03-22 11:00:31,994 Thread-3 (worker) 11612 I'm working for U.
2023-03-22 11:00:31,994 Thread-4 (worker) 1164 I'm working for U.
2023-03-22 11:00:31,994 Thread-5 (worker) 6628 I'm working for U.
2023-03-22 11:00:31,995 Thread-6 (worker) 10912 I'm working for U.
2023-03-22 11:00:31,995 Thread-7 (worker) 8140 I'm working for U.
2023-03-22 11:00:31,996 Thread-8 (worker) 6844 I'm working for U.
2023-03-22 11:00:31,996 Thread-9 (worker) 9240 I'm working for U.
2023-03-22 11:00:31,996 Thread-10 (worker) 5268 I'm working for U.
2023-03-22 11:00:32,186 Thread-8 (worker) 6844 I finished. cups=1000
2023-03-22 11:00:32,187 Thread-6 (worker) 10912 I finished. cups=1001
2023-03-22 11:00:32,187 Thread-2 (worker) 9980 I finished. cups=1002
2023-03-22 11:00:32,187 Thread-7 (worker) 8140 I finished. cups=1003
2023-03-22 11:00:32,188 Thread-1 (worker) 6352 I finished. cups=1004
2023-03-22 11:00:32,188 Thread-9 (worker) 9240 I finished. cups=1005
2023-03-22 11:00:32,189 Thread-3 (worker) 11612 I finished. cups=1006
2023-03-22 11:00:32,189 Thread-5 (worker) 6628 I finished. cups=1007
2023-03-22 11:00:32,189 Thread-10 (worker) 5268 I finished. cups=1008
2023-03-22 11:00:32,190 Thread-4 (worker) 1164 I finished. cups=1009

从上例的运行结果看出,多线程调度,导致了判断失误,多生产了杯子。如何修改?

Lock:加锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。

名称 含义
acquire(blocking=True, timeout=-1) 默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。成功获取锁,返回True,否则返回False
release() 释放锁。可以从任何线程调用释放。
import threading
from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

cups = []
lock = Lock()

def worker(count=10):
    logging.info("I'm working for U.")
    # lock.acquire()
    # while len(cups) < count:
    #     time.sleep(0.0001)
    #     cups.append(1)
    # lock.release()
    flag = False
    while True:
        lock.acquire()
        if len(cups) >= count:
            flag = True
        # lock.release() 这里释放锁如何?
        time.sleep(0.0001)
        if not flag:
            cups.append(1)
        lock.release() # 这里释放锁如何?
        if flag:
            break
    logging.info('I finished. cups={}'.format(len(cups)))

for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

21.4.3、Lock 释放

计数器类,可以加,可以减。

一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。

加锁、解锁常用语句:

1、使用 try...finally 语句保证锁的释放。

2、with 上下文管理,锁对象支持上下文管理。

改造 Couter 类,如下:

import threading
from threading import Thread, Lock

class Counter:
    def __init__(self):
        self._val = 0
        self.__lock = Lock()

    @property
    def value(self):
        with self.__lock:
            return self._val

    def inc(self):
        try:
            self.__lock.acquire()
            self._val += 1
        finally:
            self.__lock.release()

    def dec(self):
        with self.__lock:
            self._val -= 1

def run(c: Counter, count=100):
    for _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10
c2 = 1000
for i in range(c1):
    Thread(target=run, args=(c, c2)).start()

while True:
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(c.value)
        break
    else:
        print(threading.enumerate())

21.4.4、锁的应用场景

锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

如果全部都是读取同一个共享资源需要锁吗?不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁。

使用锁的注意事项:

  • 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。

  • 加锁时间越短越好,不需要就立即释放锁。

  • 一定要避免死锁。

不使用锁,有了效率,但是结果是错的;使用了锁,效率低下,但是结果是对的。所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧。

21.4.5、非阻塞锁使用

import threading
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False):  # 获取锁则返回True
            logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))
            # 适当的时机释放锁,为了演示不释放
        else:
            logging.info('{} {} is working'.format(threading.current_thread(), task.name))

class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task('task-{}'.format(x)) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, name='worker-{}'.format(i), args=(tasks,)).start()

21.4.6、可重入锁 RLock

可重入锁,是线程相关的锁。

线程 A 获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程 A 中做和 acquire 次数相同的 release。

import threading
import time

lock = threading.RLock()
print(lock.acquire())
print('-' * 80)
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
# print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {} '.format(threading.current_thread().ident))
print('lock in main thread {}'.format(lock))  # 注意观察lock对象的信息
lock.release()
# lock.release() # 多了一次
print('-' * 80)

print(lock.acquire(blocking=False))  # 1次
# threading.Timer(3, lambda x: x.release(), args=(lock,)).start() # 跨线程了,异常
lock.release()
print('-' * 80)

# 测试多线程
print(lock.acquire())

def sub(l):
    print('{}: {}'.format(threading.current_thread(), l.acquire()))  # 阻塞
    print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
    print('lock in sub thread {}'.format(lock))
    l.release()
    print('sub 1')
    l.release()
    print('sub 2')

threading.Timer(2, sub, args=(lock,)).start()  # 传入同一个lock对象
print('+' * 80)

print(lock.acquire())
lock.release()
time.sleep(5)
print("释放主线程锁")
lock.release()

可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞获取锁。当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。

21.4.7、Condition

构造方法 Condition(lock=None),可以传入一个 Lock 或 RLock 对象,默认是 RLock。

名称 含义
acquire(*args) 获取锁
wait(self, timeout=None) 等待或超时
notify(n=1) 唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
notify_all() 唤醒所有等待的线程

Condition 用于生产者、消费者模型,为了解决生产者消费者速度匹配问题。

先看一个例子,消费者消费速度大于生产者生产速度。

from threading import Thread, Event
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

# 只作演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event()  # event 只是为了使用方便,与逻辑无关

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("received {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

这个例子采用了消费者主动消费,消费者浪费了大量时间,主动来查看有没有数据。

能否换成一种通知机制,有数据通知消费者来消费呢?使用 Condition 对象。

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

# 只作演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event()  # event 只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)  # 模拟产生数据速度
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info("received {}".format(self.data))
                self.data = None
            self.event.wait(0.5)  # 模拟消费的速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

上例中,消费者等待数据,如果生产者准备好了会通知消费者消费,省得消费者反复来查看数据是否就绪。

如果是 1 个生产者,多个消费者怎么改?

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

# 只作演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event()  # event 只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)  # 模拟产生数据速度
        self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()  # 阻塞等通知
                logging.info("received {}".format(self.data))
            self.event.wait(0.5)  # 模拟消费的速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')

# 增加消费者
for i in range(5):
    c = Thread(target=d.consume, name='consumer-{}'.format(i))
    c.start()
p.start()

self.cond.notify_all() 发通知,修改为:self.cond.notify(n=2) 试一试看看结果?

这个例子,可以看到实现了消息的 一对多,这其实就是 广播 模式。

注:上例中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解 Condition 的使用,和生产者消费者模型。

Condition 用于生产者消费者模型中,解决生产者消费者速度匹配的问题。采用了通知机制,非常有效率。

使用方式:

使用 Condition,必须先 acquire,用完了要 release,因为内部使用了锁,默认使用 RLock 锁,最好的方式是使用 with 上下文。

消费者 wait,等待通知:

生产者生产好消息,对消费者发通知,可以使用 notify 或者 notify_all 方法。

21.4.8、Barrier

有人翻译成栅栏,我建议使用 屏障,可以想象成路障、道闸。3.2 引入 Python 的新功能。

名称 含义
Barrier(parties, action=None, timeout=None) 构建 Barrier 对象,指定参与方数目。timeout 是 wait 方法未指定超时的默认值。
n_waiting 当前在屏障中等待的线程数。
parties 各方数,就是需要多少个等待。
wait(timeout=None) 等待通过屏障。返回 0 到 (线程数 - 1) 的整数,每个线程返回不同。如果 wait 方法设置了超时,并超时发生,屏障将处于 broken 状态。
import threading
import logging

FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier')

barrier = threading.Barrier(3)

for x in range(3):  # 改成4,5,6试一试
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()
    threading.Event().wait(1)

logging.info('started')

运行结果:
2023-03-22 19:53:38,635  [worker-0,     1124] waiting for 0 threads.
2023-03-22 19:53:39,635  [worker-1,      944] waiting for 1 threads.
2023-03-22 19:53:40,637  [worker-2,     1708] waiting for 2 threads.
2023-03-22 19:53:40,637  [worker-2,     1708] after barrier 2
2023-03-22 19:53:40,637  [worker-0,     1124] after barrier 0
2023-03-22 19:53:40,637  [worker-1,      944] after barrier 1
2023-03-22 19:53:41,637  [MainThread,     2092] started

从运行结果看出:

所有线程冲到了 Barrier 前等待,直到到达 parties 的数目,屏障打开,所有线程停止等待,继续执行。

再有线程 wait,屏障继续阻拦,直到参与方数目达到设定值。

举例,赛马比赛中所有马匹就位,开闸。下一批马陆续来到闸门前等待比赛。

名称 含义
broken 如果屏障处于打破的状态,返回 True。
abort() 将屏障置于 broken 状态,等待中的线程或者调用等待方法的线程都会抛出 BrokenBarrierError 异常,直到 reset 方法来恢复屏障。
reset() 恢复屏障,重新开始拦截。
import threading
import logging

FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier.')

barrier = threading.Barrier(3)

for x in range(0, 9):
    if x == 2:
        barrier.abort()
    elif x == 6:
        barrier.reset()
    threading.Event().wait(1)
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()

上例中,屏障中等待了 2 个,屏障就被 broken 了,waiting 的线程抛了 BrokenBarrierError 异常,新 wait 的线程也抛异常,直到屏障恢复,才继续按照 parties 数目要求继续拦截线程。

21.4.8.1、wait 方法超时

如果 wait 方法超时发生,屏障将处于 broken 状态,直到 reset。

import threading
import logging

FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(barrier: threading.Barrier, i: int):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        logging.info(barrier.broken)  # 是否broken
        if i < 3:
            barrier_id = barrier.wait(1)  # 超时后,屏障broken
        else:
            if i == 6:
                barrier.reset()  # 恢复屏障
            barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier. run.')

barrier = threading.Barrier(3)

for x in range(0, 9):
    threading.Event().wait(2)
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier, x)).start()

21.4.8.2、barrier 应用

并发初始化:

所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作。

10 个线程做 10 种工作准备,每个线程负责一种工作,只有这 10 个线程都完成后,才能继续工作,先完成的要等待后完成的线程。

例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库连接失败,则初始化工作失败,就要 abort,barrier 置为 broken,所有线程收到异常退出。

工作量:

有 10 个计算任务,完成 6 个,就算工作完成。

21.4.9、semaphore 信号量

和 Lock 很像,信号量对象内部维护一个倒计数器,每一次 acquire 都会减 1,当 acquire 方法发现计数为 0 就阻塞请求的线程,直到其它线程对信号量 release 后,计数大于 0,放行阻塞的线程。

名称 含义
Semaphore(value=1) 构造方法。value 小于 0,抛 ValueError 异常。
acquire(blocking=True, timeout=None) 获取信号量,计数器减 1,获取成功返回 True。
release() 释放信号量,计数器加 1。

计数器永远不会低于 0,因为 acquire 的时候,发现是 0,都会被阻塞。

import threading
import logging
import time

FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(s: threading.Semaphore):
    logging.info('4 --> in sub thread')
    logging.info(s.acquire())  # 阻塞
    logging.info('sub thread over')

# 信号量
s = threading.Semaphore(3)
logging.info(s.acquire())
print(1, '-->', s._value)
logging.info(s.acquire())
print(2, '-->', s._value)
logging.info(s.acquire())
print(3, '-->', s._value)

threading.Thread(target=worker, args=(s,)).start()
time.sleep(2)

logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3))  # 阻塞3秒

# 释放
logging.info('released')
s.release()

应用举例:

连接池:因为资源有限,且开启一个连接成本高,所以,使用连接池。

一个简单的连接池:连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。

class Conn:
    def __init__(self, name):
        self.name = name

class Pool:
    def __init__(self, count: int):
        self.count = count
        self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]

    def _connect(self, conn_name):
        # 创建连接的方法,返回一个名称

        return Conn(conn_name)

    def get_conn(self):
        # 从池中拿走一个连接
        if len(self.pool) > 0:
            return self.pool.pop()

    def return_conn(self, conn: Conn):
        # 向池中添加一个连接
        self.pool.append(conn)

真正的连接池的实现比上面的例子要复杂的多,这里只是简单的一个功能的实现。

本例中,get_conn() 方法在多线程的时候有线程安全问题。假设池中正好有一个连接,有可能多个线程判断池的长度是大于 0 的,当一个线程拿走了连接对象,其他线程再来 pop 就会抛异常的。如何解决?

1、加锁,在读写的地方加锁。
2、使用信号量 Semaphore。

使用信号量对上例进行修改:

import threading
import logging
import random

FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class Conn:
    def __init__(self, name):
        self.name = name

    def __repr__(self):
        return self.name

class Pool:
    def __init__(self, count: int):
        self.count = count
        # 池中是连接对象的列表
        self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]
        self.semaphore = threading.Semaphore(count)

    def _connect(self, conn_name):
        # 创建连接的方法,返回一个名称

        return Conn(conn_name)

    def get_conn(self):
        # 从池中拿走一个连接
        print('-' * 40)
        self.semaphore.acquire()
        print('-' * 40)
        conn = self.pool.pop()
        return conn

    def return_conn(self, conn: Conn):
        # 向池中添加一个连接
        self.pool.append(conn)
        self.semaphore.release()

# 连接池初始化
pool = Pool(3)

def worker(pool: Pool):
    conn = pool.get_conn()
    logging.info(conn)

    # 模拟使用了一段时间
    threading.Event().wait(random.randint(1, 4))
    pool.return_conn(conn)

for i in range(6):
    threading.Thread(target=worker, name="worker-{}".format(i), args=(pool,)).start()

上例中,使用信号量解决资源有限的问题。

如果池中有资源,请求者获取资源时信号量减 1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加 1,等待线程就可以被唤醒拿走资源。

注意:这个例子不能用到生成环境,只是为了说明信号量使用的例子,还有很多未完成功能。

21.4.9.1、semaphore 缺陷

假设如果还没有使用信号量,就 release,会怎么样?

import logging
import threading

sema = threading.Semaphore(3)
logging.warning(sema.__dict__)
for i in range(3):
    sema.acquire()

logging.warning('-------------------------')
logging.warning(sema.__dict__)

for i in range(4):
    sema.release()

logging.warning(sema.__dict__)

for i in range(3):
    sema.acquire()

logging.warning('---------------------------')
logging.warning(sema.__dict__)
sema.acquire()
logging.warning('------------------------------')
logging.warning(sema.__dict__)

从上例输出结果可以看出,竟然内置计数器达到了 4,这样实际上超出我们的最大值,需要解决这个问题。

21.4.9.2、BoundedSemaphore 类

有界的信号量,不允许使用 release 超出初始值的范围,否则,抛出 ValueError 异常。

import logging
import threading

sema = threading.BoundedSemaphore(3)
logging.warning(sema.__dict__)
for i in range(3):
    sema.acquire()

logging.warning('-' * 80)
logging.warning(sema.__dict__)

sema.release()
sema.release()
sema.release()
logging.warning(sema.__dict__)

# sema.release() # ValueError: Semaphore released too many times

这样用有界信号量修改源代码,保证如果多 return_conn 就会抛异常。

如果归还了同一个连接多次怎么办,去重很容易判断出来。

# 如果使用了信号量,但是还没有用完
self.pool.append(conn)
self.semaphore.release()

假设一种极端情况,计数器还差 1 就满了,有三个线程 A、B、C 都执行了第一句,都没有来得及 release,这时候轮到线程 A release,正常的 release,然后轮到线程 C release,一定出问题,超界了,直接抛异常。

因此信号量,可以保证,一定不能多归还。

锁:只允许同一个时间一个线程独占资源。它是特殊的信号量,即信号量计数器初值为 1。

信号量:可以多个线程访问共享资源,但这个共享资源数量有限。

21.4.10、数据结构安全

Queue:标准库 queue 模块,提供 FIFO 的 Queue、LIFO 的队列、优先队列。

Queue 类是线程安全的,适用于多线程间安全的交换数据。内部使用了 Lock 和 Condition。

为什么讲魔术方法时,说实现容器的大小,不准确?如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走,就有可能被其他线程改了。

Queue 类的 size 虽然加了锁,但是,依然不能保证立即 get、put 就能成功,因为读取大小和 get、put 方法是分开的。

import queue

q = queue.Queue(8)

if q.qsize() == 7:
    q.put()  # 上下两句可能被打断

if q.qsize() == 1:
    q.get()  # 未必会成功

这里只是拿 Queue 作为一个例子说明,所有的数据结构,在多线程共享的时候,特别要考虑线程安全问题,否则很容易造成数据不一致。

21.5、GIL 全局解释器锁

CPython 在解释器进程级别有一把锁,做 GIL 全局解释器锁。

GIL 保证 CPython 进程中,只有一个线程执行字节码。甚至是在多核 CPU 的情况下,也是如此。

CPython 中:

IO 密集型,由于线程阻塞,就会调度其他线程;

CPU 密集型,当前线程可能会连续的获得 GIL,导致其它线程几乎无法使用 CPU。

在 CPython 中由于有 GIL 存在,IO 密集型,使用多线程;CPU 密集型,使用多进程,绕开 GIL。

新版 CPython 正在努力优化 GIL 的问题,但不是移除。如果非要强调多线程的效率问题,请绕行,选择其它语言 erlang、Go 等。

Python 中绝大多数内置数据结构的读写都是原子操作。

由于 GIL 的存在,Python 的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型的。

保留 GIL 的原因:Guido 坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用 Python。而且移除 GIL,会降低 CPython 单线程的执行效率。

测试下面 2 个程序:

import logging
import datetime

logging.basicConfig(level=logging.INFO, format='%(thread)s %(message)s')
start = datetime.datetime.now()

def calc():
    total = 0
    for _ in range(1000000000):
        total += 1

calc()
calc()
calc()
calc()
calc()

delta = (datetime.datetime.now() - start).total_seconds()
logging.info(delta)
import logging
import datetime
import threading

logging.basicConfig(level=logging.INFO, format='%(thread)s %(message)s')
start = datetime.datetime.now()

def calc():
    total = 0
    for _ in range(1000000000):
        total += 1

t1 = threading.Thread(target=calc)
t2 = threading.Thread(target=calc)
t3 = threading.Thread(target=calc)
t4 = threading.Thread(target=calc)
t5 = threading.Thread(target=calc)

t1.start()
t2.start()
t3.start()
t4.start()
t5.start()

t1.join()
t2.join()
t3.join()
t4.join()
t5.join()

delta = (datetime.datetime.now() - start).total_seconds()
logging.info(delta)

从两段程序测试的结果来看,CPython 中多线程根本没有任何优势,和一个线程执行时间相当。

因为 GIL 的存在,尤其是像上面的计算密集型程序。

标签云