21、Python3 多线程
21.1、并发
21.1.1、并发和并行区别
并行,parallel:同时做某些事,可以互不干扰的同一个时刻做几件事。
并发,concurrency:也是同时做某些事,但是强调,一个时段内有事情要处理。
21.1.2、并发的解决
食堂打饭模型:中午 12 点,开饭啦,大家都涌向食堂,这就是并发。如果人很多,就是高并发。
21.1.2.1、队列、缓冲区
假设只有一个窗口,陆续涌入食堂的人,排队打菜是比较好的方式。
所以,排队(队列)是一种天然解决并发的办法。排队就是把人排成队列,先进先出,解决了资源使用的问题。排成的队列,其实就是一个缓冲地带,就是缓冲区。
假设女生优先,那么这个窗口就得是两队,只要有女生来就可以先打饭,男生队列等着,女生队伍就是一个优先队列。
例如 queue 模块的类 Queue、LifoQueue、PriorityQueue。
21.1.2.2、争抢
只开一个窗口,有可能没有秩序,也就是谁挤进去就给谁打饭。挤到窗口的人占据窗口,直到打到饭菜离开。其他人继续争抢,会有一个人占据着窗口,可以视为锁定窗口,窗口就不能为其他人提供服务了。这是一种锁机制。
谁抢到资源就上锁,排他性的锁,其他人只能等候。争抢也是一种高并发解决方案,但是,这样不好,因为有可能有人很长时间抢不到。
21.1.2.3、预处理
如果排长队的原因,是由于每个人打菜等候时间长,因为要吃的菜没有,需要现做,没打着饭不走开,锁定着窗口。
食堂可以提前统计大多数人最爱吃的菜品,将最爱吃的 80% 的热门菜,提前做好,保证供应 20% 的冷门菜,现做。
这样大多数人,就算锁定窗口,也很快就释放窗口了。
一种提前加载用户需要的数据的思路,预处理思想,缓存常用。
21.1.2.4、并行
成百上千人同时来吃饭,一个队伍搞不定的,多开打饭窗口形成多个队列,如同开多个车道一样,并行打菜。
开窗口就得扩大食堂,得多雇人在每一个窗口提供服务,造成成本上升。
日常可以通过购买更多服务器,或多开进程、线程实现并行处理,来解决并发问题。注意这些都是水平扩展思想。
注:如果线程在单 CPU 上处理,就不是并行了。但是多数服务器都是多 CPU 的,服务的部署往往是多机的、分布式的,这都是并行处理。
21.1.2.5、提速
提高单个窗口的打饭速度,也是解决并发的方式。打饭人员提高工作技能,或为单个窗口配备更多的服务人员,都是提速的办法。
提高单个 CPU 性能,或单个服务器安装更多的 CPU。这是一种 垂直扩展 思想。
21.1.2.6、消息中间件
上地、西二旗地铁 站外的九曲回肠的走廊,缓冲人流,进去之后再多口安检进站。
常见的消息中间件有 RabbitMQ、ActiveMQ ( Apache )、RocketMQ(阿里Apache)、kafka (Apache) 等。
当然还有其他手段解决并发问题,但是已经列举了最常用的解决方案,一般来说不同的并发场景用不同的策略,而策略可能是多种方式的优化组合。
21.2、进程和线程
在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个程序的执行实例就是一个进程。
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
进程和程序的关系:
程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源),它也是线程的容器。
Linux 进程有父进程、子进程;Windows 的进程是平等关系。
线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。
一个标准的线程由线程 ID,当前指令指针(PC),寄存器集合和堆栈组成。
在许多系统中,创建一个线程比创建一个进程快10-100 倍。
进程、线程的理解:
现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。
进程就是独立的王国,进程间不可以随便的共享数据。
线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。
21.2.1、线程的状态
状态 | 含义 |
---|---|
就绪(Ready) | 线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占。 |
运行(Running) | 线程正在运行。 |
阻塞(Blocked) | 线程等待外部事件发生而无法运行,如I/O操作。 |
终止(Terminated) | 线程完成,或退出,或被取消。 |
21.2.2、Python 中的进程和线程
进程会启动一个解释器进程,线程共享一个解释器进程。
21.3、Python 线程开发
Python 的线程开发使用标准库 threading。
21.3.1、Thread 类
# 签名
def __init__(self, group=None, target=None, name=None, args=(),kwargs=None, *, daemon=None)
参数名 | 含义 |
---|---|
target | 线程调用的对象,就是目标函数 |
name | 为线程起个名字 |
args | 为目标函数传递实参,元组 |
kwargs | 为目标函数关键字传参,字典 |
21.3.2、线程启动
import threading
# 待线程调用对象
def worker():
print("I'm working")
print('finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
通过 threading.Thread
创建一个线程对象,target 是目标函数,name 可以指定名称。但是线程没有启动,需要调用 start 方法。
线程之所以执行函数,是因为线程中就是执行代码的,而最简单的封装就是函数,所以还是函数调用。
函数执行完,线程也就退出了。那么,如果不让线程退出,或者让线程一直工作怎么办呢?
import threading
import time
# 待线程调用对象
def worker():
while True:
time.sleep(1)
print("I'm working")
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
21.3.3、线程退出
Python 没有提供线程退出的方法,线程在下面情况时退出:
1、线程函数内语句执行完毕。
2、线程函数中抛出未处理的异常。
import threading
import time
# 待线程调用对象
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(1)
print("I'm working")
count += 1
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
print("=== End ===")
Python 的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了。
21.3.4、线程的参数
import threading
import time
# 待线程调用对象
def add(x, y):
print('{} + {} = {}'.format(x, y, x + y, threading.current_thread().ident))
thread1 = threading.Thread(target=add, name='add', args=(4, 5)) # 线程对象
thread1.start() # 启动
time.sleep(2)
thread2 = threading.Thread(target=add, name='add', args=(5,), kwargs={'y': 4}) # 线程对象
thread2.start() # 启动
time.sleep(2)
thread3 = threading.Thread(target=add, name='add', kwargs={'x': 4, 'y': 5}) # 线程对象
thread3.start() # 启动
线程传参和函数传参没什么区别,本质上就是函数传参。
21.3.5、threading 的属性和方法
名称 | 含义 |
---|---|
current_thread() | 返回当前线程对象 |
main_thread() | 返回主线程对象 |
active_count() | 当前处于alive状态的线程个数 |
enumerate() | 返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程 |
get_ident() | 返回当前线程的ID,非0整数 |
active_count、enumerate 方法返回的值还包括主线程。
import threading
import time
def showthreadinfo():
print('currentthread = {}'.format(threading.current_thread()))
print('main thread = {}'.format(threading.main_thread()))
print('active count= {}'.format(threading.active_count()))
def worker():
count = 0
showthreadinfo()
while True:
if count > 5:
break
time.sleep(1)
count += 1
print("I'm working")
t = threading.Thread(target=worker, name='worker') # 线程对象
showthreadinfo()
t.start() # 启动
print("===End===")
21.3.6、Thread 实例的属性和方法
名称 | 含义 |
---|---|
name | 只是一个名字,只是个标识,名称可以重名。getName()、setName()获取、设置这个名词。 |
ident | 线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可以访问。此ID可以重复使用。 |
is_alive() | 返回线程是否活着。 |
注意:线程的 name 这是一个名称,可以重复;ID 必须唯一,但可以在线程退出后再利用。
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(1)
count += 1
print(1, 'in thread -->', threading.current_thread().name)
t = threading.Thread(target=worker, name='worker') # 线程对象
print(t.ident)
t.start() # 启动
while True:
time.sleep(1)
if t.is_alive():
print(2, 'in main -->', '{} {} alive'.format(t.name, t.ident))
else:
print(2.1, 'in main -->', '{} {} dead'.format(t.name, t.ident))
# t.start() # RuntimeError: threads can only be started once
名称 | 含义 |
---|---|
start() | 启动线程。每一个线程必须且只能执行该方法一次。 |
run() | 运行线程函数。 |
为了演示,派生一个 Thread 的子类。
21.3.6.1、start 方法
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(1)
count += 1
print(1, 'in thread -->', 'worker running')
class MyThread(threading.Thread):
def start(self):
print(2, 'in son Thread -->', 'start ...')
super().start()
def run(self):
print(3, 'in son Thread -->', 'run ...')
super().run()
t = MyThread(name='worker', target=worker)
t.start()
运行结果:
2 in son Thread --> start ...
3 in son Thread --> run ...
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
21.3.6.2、run 方法
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(1)
count += 1
print(1, 'in thread -->', 'worker running')
class MyThread(threading.Thread):
def start(self):
print(2, 'in son Thread -->', 'start ...')
super().start()
def run(self):
print(3, 'in son Thread -->', 'run ...')
super().run()
t = MyThread(name='worker', target=worker)
# t.start()
t.run()
运行结果:
3 in son Thread --> run ...
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
1 in thread --> worker running
start() 方法会调用 run() 方法,而 run() 方法可以运行函数。
这两个方法看似功能重复了,这么看来留一个方法就可以了。是这样吗?
21.3.6.3、start 和 run 的区别
在线程函数中,增加打印线程的名字的语句,看看能看到什么信息。
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(1)
count += 1
print(1, 'in thread -->', 'worker running')
print(1.1, 'in thread -->', threading.current_thread().name)
class MyThread(threading.Thread):
def start(self):
print(2, 'in son Thread -->', 'start ...')
super().start()
def run(self):
print(3, 'in son Thread -->', 'run ...')
super().run()
t = MyThread(name='worker', target=worker)
# t.start()
t.run() # 分别执行start或者run方法
使用 start 方法启动线程,启动了一个新的线程,名字叫做 worker。但是使用 run 方法,并没有启动新的线程,就是在主线程中调用了一个普通的函数而已。
因此,启动线程请使用 start 方法,才能启动多个线程。
21.3.7、多线程
顾名思义,多个线程,一个进程中如果有多个线程,就是多线程,实现一种并发。
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(0.5)
count += 1
print(1, 'in thread -->', 'worker running')
print(1.1, 'in thread -->', threading.current_thread().name, threading.current_thread().ident)
class MyThread(threading.Thread):
def start(self):
print(2, 'in son Thread -->', 'start ...')
super().start()
def run(self):
print(3, 'in son Thread -->', 'run ...')
super().run()
t1 = MyThread(name='工人1', target=worker)
t2 = MyThread(name='工人2', target=worker)
# t1.start()
# t2.start()
t1.run()
t2.run()
没有开启新的线程,这就是普通函数调用,所以执行完 t1.run(),然后执行 t2.run(),这里就不是多线程。
当使用 start 方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程。
一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。一个进程至少有一个主线程。
其他线程称为工作线程。
21.3.8、线程安全
IPython 中演示,python 命令行、pycharm 都不能演示出效果。
import threading
def worker():
for _ in range(100):
print("{} is running.".format(threading.current_thread().name))
for x in range(1, 5):
name = "worker {}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
输出:
......
worker 4 is running.
worker 2 is running.worker 1 is running.
worker 1 is running.
worker 1 is running.worker 4 is running.
worker 4 is running.
......
看代码,应该是一行行打印,但是很多字符串打在了一起,为什么?
说明 print 函数被打断了,被线程切换打断了。print 函数分两步,第一步打印字符串,第二步换行,就在这之间,发生了线程的切换。
这说明 print 函数是线程不安全的。
线程安全:线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的。
上例中,本以为 print 应该是打印文本之后紧跟着一个换行的,但是有时候确实好几个文本在一起,后面跟上换行,而且发生这种情况的时机不确定。
所以,print 函数不是线程安全函数。如果是这样,多线程编程的时候,print 输出日志,不能保证一个输出后面一定立即换行了,怎么办?
21.3.8.1、不让print打印换行
import threading
def worker():
for _ in range(100):
print("{} is running.\n".format(threading.current_thread().name), end='')
for x in range(1, 5):
name = "worker {}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
字符串是不可变的类型,它可以作为一个整体不可分割输出。end='' 就不再让 print 输出换行了。
21.3.8.2、使用logging
标准库里面的 logging 模块,日志处理模块,属于线程安全模块,生产环境代码都使用 logging。
import logging
import threading
def worker():
for _ in range(100):
# print("{} is running.\n".format(threading.current_thread().name), end='')
logging.warning("{} is running.".format(threading.current_thread().name))
for x in range(1, 5):
name = "worker {}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
21.3.9、daemon 和 non-daemon 线程
注意:这里的 daemon 不是 Linux 中的守护进程。
进程靠线程执行代码,至少有一个主线程,其它线程是工作线程。
主线程是第一个启动的线程。
父线程:如果线程 A 中启动了一个线程 B,A 就是 B 的父线程。
子线程:B 就是 A 的子线程。
Python 中,构造线程的时候,可以设置 daemon 属性,这个属性必须在 start 方法前设置好。
# 源码Thread的__init__方法中
if daemon is not None:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
self._ident = None
线程 daemon 属性,如果设定就是用户的设置,否则就取当前线程的 daemon 值。
主线程是 non-daemon 线程,即 daemon = False。
import time
import threading
def foo():
time.sleep(5)
for i in range(20):
print(i)
# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=False)
t.start()
print('Main Thread Exiting')
发现线程 t 依然执行,主线程已经执行完,但是一直等着线程 t。
修改为 t = threading.Thread(target=foo,daemon=True)
试一试。程序立即结束了,根本没有等线程 t。
名称 | 含义 |
---|---|
daemon属性 | 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常 |
isDaemon() | 是否是daemon线程 |
setDaemon | 设置为daemon线程,必须在start方法之前设置 |
总结:
线程具有一个 daemon 属性,可以显示设置为 True 或 False,也可以不设置,则取默认值 None。
如果不设置 daemon,就取当前线程的 daemon 来设置它。主线程是 non-daemon 线程,即 daemon = False。
从主线程创建的所有线程不设置 daemon 属性,则默认都是 daemon = False,也就是 non-daemon 线程。
Python 程序在没有活着的 non-daemon 线程运行时随主线程退出,也就是剩下的只能是 daemon 线程,主线程才能退出,否则主线程就只能等待。
思考下面程序的输出是什么?
import time
import threading
def bar():
time.sleep(10)
print('bar')
def foo():
for i in range(20):
print(i)
t = threading.Thread(target=bar, daemon=False)
t.start()
# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=True)
t.start()
print('Main Thread Exiting')
上例中,会不会输出 bar 这个字符串,如果没有,如何修改才能打印出来?
time.sleep(2)
print('Main Thread Exiting')
再看一个例子:
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
# 主线程是non-daemon线程
t1 = threading.Thread(target=foo, args=(20,), daemon=True) # 调换10和20看看效果
t1.start()
t2 = threading.Thread(target=foo, args=(10,), daemon=False)
t2.start()
time.sleep(2)
print('Main Thread Exiting')
上例说明,如果有 non-daemon 线程的时候,主线程退出时,也不会杀掉所有 daemon 线程,直到所有 non-daemon 线程全部结束;如果还有 daemon 线程,主线程需要退出,会结束所有 daemon 线程,退出。
21.3.10、join 方法
先看一个简单的例子,看看效果:
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
t1 = threading.Thread(target=foo, args=(10,), daemon=True)
t1.start()
t1.join() # 设置join,取消join对比一下
print('Main Thread Exiting')
使用了 join 方法后,daemon 线程执行完了,主线程才退出了。
join(timeout=None),是线程的标准方法之一。
一个线程中调用另一个线程的 join 方法,调用者将被阻塞,直到被调用线程终止。
一个线程可以被 join 多次。
timeout 参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束。
调用谁的 join 方法,就是 join 谁,就要等谁。
21.3.11、daemon 线程应用场景
简单来说就是,本来并没有 daemon thread,为了简化程序员的工作,让他们不用去记录和管理那些后台线程,创造了一个 daemon thread 的概念。这个概念唯一的作用就是,当你把一个线程设置为 daemon,它会随主线程的退出而退出。
主要应用场景有:
1、后台任务。如发送心跳包、监控,这种场景最多。
2、主线程工作才有用的线程。如主线程中维护着公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作也没有意义了,一起退出最合适。
3、随时可以被终止的线程。
如果主线程退出,想所有其它工作线程一起退出,就使用 daemon=True 来创建工作线程。
比如,开启一个线程定时判断 WEB 服务是否正常工作,主线程退出,工作线程也没有必须存在了,应该随着主线程退出一起退出。这种 daemon 线程一旦创建,就可以忘记它了,只需关心主线程什么时候退出就行了。
daemon 线程,简化了程序员手动关闭线程的工作。
如果在 non-daemon 线程 A 中,对另一个 daemon 线程 B 使用了 join 方法,这个线程 B 设置成 daemon 就没有什么意义了,因为 non-daemon 线程 A 总是要等待 B。
如果在一个 daemon 线程 C 中,对另一个 daemon 线程 D 使用了 join 方法,只能说明 C 要等待 D,主线程退出,C 和 D 不管是否结束,也不管它们谁等谁,都要被杀掉。
举例:
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print("t1's daemon = {}".format(threading.current_thread().daemon))
t2 = threading.Thread(target=bar)
t2.start()
print("t2's daemon = {}".format(t2.daemon))
t1 = threading.Thread(target=foo, daemon=True)
t1.start()
time.sleep(3)
print("Main Thread Exiting")
上例,只要主线程退出,2 个工作线程都结束。可以使用 join,让线程结束不了,怎么做?
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print("t1's daemon = {}".format(threading.current_thread().daemon))
t2 = threading.Thread(target=bar)
t2.start()
print("t2's daemon = {}".format(t2.daemon))
t2.join()
t1 = threading.Thread(target=foo, daemon=True)
t1.start()
t1.join()
print("Main Thread Exiting")
21.3.12、threading.local 类
import threading
import time
# 局部变量实现
def worker():
x = 0
for i in range(100):
time.sleep(0.0001)
x += 1
print(threading.current_thread(), x)
for _ in range(10):
threading.Thread(target=worker).start()
上例使用多线程,每个线程完成不同的计算任务。x 是局部变量。能否改造成使用全局变量完成。
import threading
import time
class A:
def __init__(self):
self.x = 0
# 全局对象
global_data = A()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.0001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for _ in range(10):
threading.Thread(target=worker).start()
上例虽然使用了全局对象,但是线程之间互相干扰,导致了错误的结果。
能不能使用全局对象,还能保持每个线程使用不同的数据呢?
python 提供 threading.local
类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储的数据其他线程看不见。
import threading
import time
# 全局对象
global_data = threading.local()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.0001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for _ in range(10):
threading.Thread(target=worker).start()
结果显示和使用局部变量的效果一样。再看 threading.local 的例子。
import threading
X = 'abc'
ctx = threading.local()
ctx.x = 123
print(ctx, type(ctx), ctx.x)
def worker():
print(X)
print(ctx)
print(ctx.x)
print('working')
worker() # 普通函数调用
print('-' * 80)
threading.Thread(target=worker).start() # 另起一个线程
从运行结果来看,另起一个线程打印 ctx.x
出错了。
AttributeError: '_thread._local' object has no attribute 'x'
但是,ctx 打印没有出错,说明看到 ctx,但是 ctx 中的 x 看不到,这个 x 不能跨线程。
threading.local
类构建了一个大字典,其元素是每个线程实例的地址为 key 和线程对象引用线程单独的字典的映射,如下:
id(Thread) -> (ref(Thread),thread-local dict)
通过 threading.local
实例就可在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全。
21.3.13、Timer 定时器
threading.Timer
继承自 Thread,这个类用来定义多久执行一个函数。
class threading.Timer(interval, function, args=None, kwargs=None)
start 方法执行之后,Timer 对象会处于等待状态,等待了 interval 之后,开始执行 function 函数。
如果在执行函数之前的等待阶段,使用了 cancel 方法,就会跳过执行函数结束。
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5, worker)
t.name = 'w1'
t.start() # 启动线程
print(threading.enumerate())
t.cancel() # 取消,可以注释这一句看看如何定时执行
time.sleep(1)
print(threading.enumerate())
如果线程中 worker 函数已经开始执行,cancel 就没有任何效果了。
总结:Timer 是线程 Thread 的子类,就是线程类,具有线程的能力和特征。
它的实例是能够延时执行目标函数的线程,在真正执行目标函数之前,都可以 cancel 它。
提前 cancel:
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5, worker)
t.name = 'w1'
t.cancel() # 提前取消
t.start() # 启动线程
t.join() # 就算join也没用
print(threading.enumerate())
print(threading.enumerate())
21.4、线程同步
线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。
不同操作系统实现技术有所不同,有临界区( Critical Section )、互斥量( Mutex)、信号量( Semaphore)、事件 Event 等。
21.4.1、Event
Event 事件,是线程间通信机制中最简单的实现,使用一个内部的标记 flag,通过 flag 的 True 或 False 的变化来进行操作。
名称 | 含义 |
---|---|
set() | 标记设置为True |
clear() | 标记设置为False |
is_set() | 标记是否为True |
wait(timeout=None) | 设置等待标记为True的时长,None为无限等待。等到返回True,未等到超时了返回False |
需求:老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到生产了 10 个杯子。
from threading import Event, Thread
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def boss(event: Event):
logging.info("I'm boss, waiting for you")
# 等待
event.wait()
logging.info("Good Job")
def worker(event: Event, count=10):
logging.info("I'm working for U.")
cups = []
while True:
logging.info('make 1')
time.sleep(0.5)
cups.append(1)
if len(cups) >= count:
# 通知
event.set()
break
logging.info("I finished my job. cups={}".format(cups))
event = Event()
w = Thread(target=worker, args=(event,))
b = Thread(target=boss, args=(event,))
w.start()
b.start()
总结:使用同一个 Event 对象的标记 flag。谁 wait 就是等到 flag 变为 True,或等到超时返回 False。不限制等待的个数。
wait 的使用:
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)
def do(event: Event, interval: int):
while not event.wait(interval):
logging.info('do sth.')
e = Event()
Thread(target=do, args=(e, 3)).start()
e.wait(10)
e.set()
print('main exit')
Event 的 wait 优于 time.sleep,它会更快的切换到其它线程,提高并发效率。
Event 练习:实现 Timer,延时执行的线程,延时计算add(x, y)。
思路:Timer 的构造函数中参数得有哪些?如何实现start启动一个线程执行函数?如何cancel取消待执行任务?
# 思路实现
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)
def add(x: int, y: int):
logging.info(x + y)
class Timer:
def __init__(self, fn, interval, *args, **kwargs):
pass
def start(self):
pass
def cancel(self):
pass
完整实现:
import datetime
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)
def add(x: int, y: int):
logging.info(x + y)
class Timer:
def __init__(self, fn, interval, *args, **kwargs):
self.fn = fn
self.interval = interval
self.args = args
self.kwargs = kwargs
self.event = Event()
def start(self):
Thread(target=self.__run).start()
def cancel(self):
self.event.set()
def __run(self):
start = datetime.datetime.now()
logging.info('waiting')
self.event.wait(self.interval)
if not self.event.is_set():
self.fn(*self.args, **self.kwargs)
delta = (datetime.datetime.now() - start).total_seconds()
if not self.event.is_set():
logging.info('You took {}s and finished'.format(delta))
self.event.set()
else:
logging.info('You took {}s but not finished'.format(delta))
t = Timer(add, 10, 4, 50)
t.start()
e = Event()
e.wait(4)
# t.cancel()
print('======MainThread exit======')
21.4.2、Lock
锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。
需求:订单要求生产1000个杯子,组织10个工人生产。
import threading
from threading import Thread, Lock
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
def worker(count=10):
logging.info("I'm working for U.")
while len(cups) < count:
time.sleep(0.0001)
cups.append(1)
logging.info('I finished. cups={}'.format(len(cups)))
for _ in range(10):
Thread(target=worker, args=(1000,)).start()
输出:
2023-03-22 11:00:31,993 Thread-1 (worker) 6352 I'm working for U.
2023-03-22 11:00:31,993 Thread-2 (worker) 9980 I'm working for U.
2023-03-22 11:00:31,994 Thread-3 (worker) 11612 I'm working for U.
2023-03-22 11:00:31,994 Thread-4 (worker) 1164 I'm working for U.
2023-03-22 11:00:31,994 Thread-5 (worker) 6628 I'm working for U.
2023-03-22 11:00:31,995 Thread-6 (worker) 10912 I'm working for U.
2023-03-22 11:00:31,995 Thread-7 (worker) 8140 I'm working for U.
2023-03-22 11:00:31,996 Thread-8 (worker) 6844 I'm working for U.
2023-03-22 11:00:31,996 Thread-9 (worker) 9240 I'm working for U.
2023-03-22 11:00:31,996 Thread-10 (worker) 5268 I'm working for U.
2023-03-22 11:00:32,186 Thread-8 (worker) 6844 I finished. cups=1000
2023-03-22 11:00:32,187 Thread-6 (worker) 10912 I finished. cups=1001
2023-03-22 11:00:32,187 Thread-2 (worker) 9980 I finished. cups=1002
2023-03-22 11:00:32,187 Thread-7 (worker) 8140 I finished. cups=1003
2023-03-22 11:00:32,188 Thread-1 (worker) 6352 I finished. cups=1004
2023-03-22 11:00:32,188 Thread-9 (worker) 9240 I finished. cups=1005
2023-03-22 11:00:32,189 Thread-3 (worker) 11612 I finished. cups=1006
2023-03-22 11:00:32,189 Thread-5 (worker) 6628 I finished. cups=1007
2023-03-22 11:00:32,189 Thread-10 (worker) 5268 I finished. cups=1008
2023-03-22 11:00:32,190 Thread-4 (worker) 1164 I finished. cups=1009
从上例的运行结果看出,多线程调度,导致了判断失误,多生产了杯子。如何修改?
Lock:加锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。
名称 | 含义 |
---|---|
acquire(blocking=True, timeout=-1) | 默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。成功获取锁,返回True,否则返回False |
release() | 释放锁。可以从任何线程调用释放。 |
import threading
from threading import Thread, Lock
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
lock = Lock()
def worker(count=10):
logging.info("I'm working for U.")
# lock.acquire()
# while len(cups) < count:
# time.sleep(0.0001)
# cups.append(1)
# lock.release()
flag = False
while True:
lock.acquire()
if len(cups) >= count:
flag = True
# lock.release() 这里释放锁如何?
time.sleep(0.0001)
if not flag:
cups.append(1)
lock.release() # 这里释放锁如何?
if flag:
break
logging.info('I finished. cups={}'.format(len(cups)))
for _ in range(10):
Thread(target=worker, args=(1000,)).start()
21.4.3、Lock 释放
计数器类,可以加,可以减。
一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。
加锁、解锁常用语句:
1、使用 try...finally
语句保证锁的释放。
2、with 上下文管理,锁对象支持上下文管理。
改造 Couter 类,如下:
import threading
from threading import Thread, Lock
class Counter:
def __init__(self):
self._val = 0
self.__lock = Lock()
@property
def value(self):
with self.__lock:
return self._val
def inc(self):
try:
self.__lock.acquire()
self._val += 1
finally:
self.__lock.release()
def dec(self):
with self.__lock:
self._val -= 1
def run(c: Counter, count=100):
for _ in range(count):
for i in range(-50, 50):
if i < 0:
c.dec()
else:
c.inc()
c = Counter()
c1 = 10
c2 = 1000
for i in range(c1):
Thread(target=run, args=(c, c2)).start()
while True:
if threading.active_count() == 1:
print(threading.enumerate())
print(c.value)
break
else:
print(threading.enumerate())
21.4.4、锁的应用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源需要锁吗?不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁。
使用锁的注意事项:
-
少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。
-
加锁时间越短越好,不需要就立即释放锁。
-
一定要避免死锁。
不使用锁,有了效率,但是结果是错的;使用了锁,效率低下,但是结果是对的。所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧。
21.4.5、非阻塞锁使用
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker(tasks):
for task in tasks:
time.sleep(0.001)
if task.lock.acquire(False): # 获取锁则返回True
logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))
# 适当的时机释放锁,为了演示不释放
else:
logging.info('{} {} is working'.format(threading.current_thread(), task.name))
class Task:
def __init__(self, name):
self.name = name
self.lock = threading.Lock()
tasks = [Task('task-{}'.format(x)) for x in range(10)]
for i in range(5):
threading.Thread(target=worker, name='worker-{}'.format(i), args=(tasks,)).start()
21.4.6、可重入锁 RLock
可重入锁,是线程相关的锁。
线程 A 获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程 A 中做和 acquire 次数相同的 release。
import threading
import time
lock = threading.RLock()
print(lock.acquire())
print('-' * 80)
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
# print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {} '.format(threading.current_thread().ident))
print('lock in main thread {}'.format(lock)) # 注意观察lock对象的信息
lock.release()
# lock.release() # 多了一次
print('-' * 80)
print(lock.acquire(blocking=False)) # 1次
# threading.Timer(3, lambda x: x.release(), args=(lock,)).start() # 跨线程了,异常
lock.release()
print('-' * 80)
# 测试多线程
print(lock.acquire())
def sub(l):
print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞
print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
print('lock in sub thread {}'.format(lock))
l.release()
print('sub 1')
l.release()
print('sub 2')
threading.Timer(2, sub, args=(lock,)).start() # 传入同一个lock对象
print('+' * 80)
print(lock.acquire())
lock.release()
time.sleep(5)
print("释放主线程锁")
lock.release()
可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞获取锁。当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。
21.4.7、Condition
构造方法 Condition(lock=None),可以传入一个 Lock 或 RLock 对象,默认是 RLock。
名称 | 含义 |
---|---|
acquire(*args) | 获取锁 |
wait(self, timeout=None) | 等待或超时 |
notify(n=1) | 唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作 |
notify_all() | 唤醒所有等待的线程 |
Condition 用于生产者、消费者模型,为了解决生产者消费者速度匹配问题。
先看一个例子,消费者消费速度大于生产者生产速度。
from threading import Thread, Event
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 只作演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event 只是为了使用方便,与逻辑无关
def produce(self, total):
for _ in range(total):
data = random.randint(0, 100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
data = self.data
logging.info("received {}".format(data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()
这个例子采用了消费者主动消费,消费者浪费了大量时间,主动来查看有没有数据。
能否换成一种通知机制,有数据通知消费者来消费呢?使用 Condition 对象。
from threading import Thread, Event, Condition
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 只作演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event 只是为了使用方便,与逻辑无关
self.cond = Condition()
def produce(self, total):
for _ in range(total):
data = random.randint(0, 100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) # 模拟产生数据速度
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait()
logging.info("received {}".format(self.data))
self.data = None
self.event.wait(0.5) # 模拟消费的速度
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()
上例中,消费者等待数据,如果生产者准备好了会通知消费者消费,省得消费者反复来查看数据是否就绪。
如果是 1 个生产者,多个消费者怎么改?
from threading import Thread, Event, Condition
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 只作演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event 只是为了使用方便,与逻辑无关
self.cond = Condition()
def produce(self, total):
for _ in range(total):
data = random.randint(0, 100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) # 模拟产生数据速度
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait() # 阻塞等通知
logging.info("received {}".format(self.data))
self.event.wait(0.5) # 模拟消费的速度
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
# 增加消费者
for i in range(5):
c = Thread(target=d.consume, name='consumer-{}'.format(i))
c.start()
p.start()
self.cond.notify_all()
发通知,修改为:self.cond.notify(n=2)
试一试看看结果?
这个例子,可以看到实现了消息的 一对多
,这其实就是 广播
模式。
注:上例中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解 Condition 的使用,和生产者消费者模型。
Condition 用于生产者消费者模型中,解决生产者消费者速度匹配的问题。采用了通知机制,非常有效率。
使用方式:
使用 Condition,必须先 acquire,用完了要 release,因为内部使用了锁,默认使用 RLock 锁,最好的方式是使用 with 上下文。
消费者 wait,等待通知:
生产者生产好消息,对消费者发通知,可以使用 notify 或者 notify_all 方法。
21.4.8、Barrier
有人翻译成栅栏,我建议使用 屏障
,可以想象成路障、道闸。3.2 引入 Python 的新功能。
名称 | 含义 |
---|---|
Barrier(parties, action=None, timeout=None) | 构建 Barrier 对象,指定参与方数目。timeout 是 wait 方法未指定超时的默认值。 |
n_waiting | 当前在屏障中等待的线程数。 |
parties | 各方数,就是需要多少个等待。 |
wait(timeout=None) | 等待通过屏障。返回 0 到 (线程数 - 1) 的整数,每个线程返回不同。如果 wait 方法设置了超时,并超时发生,屏障将处于 broken 状态。 |
import threading
import logging
FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(barrier: threading.Barrier):
logging.info('waiting for {} threads.'.format(barrier.n_waiting))
try:
barrier_id = barrier.wait()
logging.info('after barrier {}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('Broken Barrier')
barrier = threading.Barrier(3)
for x in range(3): # 改成4,5,6试一试
threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()
threading.Event().wait(1)
logging.info('started')
运行结果:
2023-03-22 19:53:38,635 [worker-0, 1124] waiting for 0 threads.
2023-03-22 19:53:39,635 [worker-1, 944] waiting for 1 threads.
2023-03-22 19:53:40,637 [worker-2, 1708] waiting for 2 threads.
2023-03-22 19:53:40,637 [worker-2, 1708] after barrier 2
2023-03-22 19:53:40,637 [worker-0, 1124] after barrier 0
2023-03-22 19:53:40,637 [worker-1, 944] after barrier 1
2023-03-22 19:53:41,637 [MainThread, 2092] started
从运行结果看出:
所有线程冲到了 Barrier 前等待,直到到达 parties 的数目,屏障打开,所有线程停止等待,继续执行。
再有线程 wait,屏障继续阻拦,直到参与方数目达到设定值。
举例,赛马比赛中所有马匹就位,开闸。下一批马陆续来到闸门前等待比赛。
名称 | 含义 |
---|---|
broken | 如果屏障处于打破的状态,返回 True。 |
abort() | 将屏障置于 broken 状态,等待中的线程或者调用等待方法的线程都会抛出 BrokenBarrierError 异常,直到 reset 方法来恢复屏障。 |
reset() | 恢复屏障,重新开始拦截。 |
import threading
import logging
FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(barrier: threading.Barrier):
logging.info('waiting for {} threads.'.format(barrier.n_waiting))
try:
barrier_id = barrier.wait()
logging.info('after barrier {}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('Broken Barrier.')
barrier = threading.Barrier(3)
for x in range(0, 9):
if x == 2:
barrier.abort()
elif x == 6:
barrier.reset()
threading.Event().wait(1)
threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()
上例中,屏障中等待了 2 个,屏障就被 broken 了,waiting 的线程抛了 BrokenBarrierError 异常,新 wait 的线程也抛异常,直到屏障恢复,才继续按照 parties 数目要求继续拦截线程。
21.4.8.1、wait 方法超时
如果 wait 方法超时发生,屏障将处于 broken 状态,直到 reset。
import threading
import logging
FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(barrier: threading.Barrier, i: int):
logging.info('waiting for {} threads.'.format(barrier.n_waiting))
try:
logging.info(barrier.broken) # 是否broken
if i < 3:
barrier_id = barrier.wait(1) # 超时后,屏障broken
else:
if i == 6:
barrier.reset() # 恢复屏障
barrier_id = barrier.wait()
logging.info('after barrier {}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('Broken Barrier. run.')
barrier = threading.Barrier(3)
for x in range(0, 9):
threading.Event().wait(2)
threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier, x)).start()
21.4.8.2、barrier 应用
并发初始化:
所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作。
10 个线程做 10 种工作准备,每个线程负责一种工作,只有这 10 个线程都完成后,才能继续工作,先完成的要等待后完成的线程。
例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库连接失败,则初始化工作失败,就要 abort,barrier 置为 broken,所有线程收到异常退出。
工作量:
有 10 个计算任务,完成 6 个,就算工作完成。
21.4.9、semaphore 信号量
和 Lock 很像,信号量对象内部维护一个倒计数器,每一次 acquire 都会减 1,当 acquire 方法发现计数为 0 就阻塞请求的线程,直到其它线程对信号量 release 后,计数大于 0,放行阻塞的线程。
名称 | 含义 |
---|---|
Semaphore(value=1) | 构造方法。value 小于 0,抛 ValueError 异常。 |
acquire(blocking=True, timeout=None) | 获取信号量,计数器减 1,获取成功返回 True。 |
release() | 释放信号量,计数器加 1。 |
计数器永远不会低于 0,因为 acquire 的时候,发现是 0,都会被阻塞。
import threading
import logging
import time
FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def worker(s: threading.Semaphore):
logging.info('4 --> in sub thread')
logging.info(s.acquire()) # 阻塞
logging.info('sub thread over')
# 信号量
s = threading.Semaphore(3)
logging.info(s.acquire())
print(1, '-->', s._value)
logging.info(s.acquire())
print(2, '-->', s._value)
logging.info(s.acquire())
print(3, '-->', s._value)
threading.Thread(target=worker, args=(s,)).start()
time.sleep(2)
logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3)) # 阻塞3秒
# 释放
logging.info('released')
s.release()
应用举例:
连接池:因为资源有限,且开启一个连接成本高,所以,使用连接池。
一个简单的连接池:连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。
class Conn:
def __init__(self, name):
self.name = name
class Pool:
def __init__(self, count: int):
self.count = count
self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]
def _connect(self, conn_name):
# 创建连接的方法,返回一个名称
return Conn(conn_name)
def get_conn(self):
# 从池中拿走一个连接
if len(self.pool) > 0:
return self.pool.pop()
def return_conn(self, conn: Conn):
# 向池中添加一个连接
self.pool.append(conn)
真正的连接池的实现比上面的例子要复杂的多,这里只是简单的一个功能的实现。
本例中,get_conn() 方法在多线程的时候有线程安全问题。假设池中正好有一个连接,有可能多个线程判断池的长度是大于 0 的,当一个线程拿走了连接对象,其他线程再来 pop 就会抛异常的。如何解决?
1、加锁,在读写的地方加锁。
2、使用信号量 Semaphore。
使用信号量对上例进行修改:
import threading
import logging
import random
FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
class Conn:
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
class Pool:
def __init__(self, count: int):
self.count = count
# 池中是连接对象的列表
self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]
self.semaphore = threading.Semaphore(count)
def _connect(self, conn_name):
# 创建连接的方法,返回一个名称
return Conn(conn_name)
def get_conn(self):
# 从池中拿走一个连接
print('-' * 40)
self.semaphore.acquire()
print('-' * 40)
conn = self.pool.pop()
return conn
def return_conn(self, conn: Conn):
# 向池中添加一个连接
self.pool.append(conn)
self.semaphore.release()
# 连接池初始化
pool = Pool(3)
def worker(pool: Pool):
conn = pool.get_conn()
logging.info(conn)
# 模拟使用了一段时间
threading.Event().wait(random.randint(1, 4))
pool.return_conn(conn)
for i in range(6):
threading.Thread(target=worker, name="worker-{}".format(i), args=(pool,)).start()
上例中,使用信号量解决资源有限的问题。
如果池中有资源,请求者获取资源时信号量减 1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加 1,等待线程就可以被唤醒拿走资源。
注意:这个例子不能用到生成环境,只是为了说明信号量使用的例子,还有很多未完成功能。
21.4.9.1、semaphore 缺陷
假设如果还没有使用信号量,就 release,会怎么样?
import logging
import threading
sema = threading.Semaphore(3)
logging.warning(sema.__dict__)
for i in range(3):
sema.acquire()
logging.warning('-------------------------')
logging.warning(sema.__dict__)
for i in range(4):
sema.release()
logging.warning(sema.__dict__)
for i in range(3):
sema.acquire()
logging.warning('---------------------------')
logging.warning(sema.__dict__)
sema.acquire()
logging.warning('------------------------------')
logging.warning(sema.__dict__)
从上例输出结果可以看出,竟然内置计数器达到了 4,这样实际上超出我们的最大值,需要解决这个问题。
21.4.9.2、BoundedSemaphore 类
有界的信号量,不允许使用 release 超出初始值的范围,否则,抛出 ValueError 异常。
import logging
import threading
sema = threading.BoundedSemaphore(3)
logging.warning(sema.__dict__)
for i in range(3):
sema.acquire()
logging.warning('-' * 80)
logging.warning(sema.__dict__)
sema.release()
sema.release()
sema.release()
logging.warning(sema.__dict__)
# sema.release() # ValueError: Semaphore released too many times
这样用有界信号量修改源代码,保证如果多 return_conn 就会抛异常。
如果归还了同一个连接多次怎么办,去重很容易判断出来。
# 如果使用了信号量,但是还没有用完
self.pool.append(conn)
self.semaphore.release()
假设一种极端情况,计数器还差 1 就满了,有三个线程 A、B、C 都执行了第一句,都没有来得及 release,这时候轮到线程 A release,正常的 release,然后轮到线程 C release,一定出问题,超界了,直接抛异常。
因此信号量,可以保证,一定不能多归还。
锁:只允许同一个时间一个线程独占资源。它是特殊的信号量,即信号量计数器初值为 1。
信号量:可以多个线程访问共享资源,但这个共享资源数量有限。
21.4.10、数据结构安全
Queue:标准库 queue 模块,提供 FIFO 的 Queue、LIFO 的队列、优先队列。
Queue 类是线程安全的,适用于多线程间安全的交换数据。内部使用了 Lock 和 Condition。
为什么讲魔术方法时,说实现容器的大小,不准确?如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走,就有可能被其他线程改了。
Queue 类的 size 虽然加了锁,但是,依然不能保证立即 get、put 就能成功,因为读取大小和 get、put 方法是分开的。
import queue
q = queue.Queue(8)
if q.qsize() == 7:
q.put() # 上下两句可能被打断
if q.qsize() == 1:
q.get() # 未必会成功
这里只是拿 Queue 作为一个例子说明,所有的数据结构,在多线程共享的时候,特别要考虑线程安全问题,否则很容易造成数据不一致。
21.5、GIL 全局解释器锁
CPython 在解释器进程级别有一把锁,做 GIL 全局解释器锁。
GIL 保证 CPython 进程中,只有一个线程执行字节码。甚至是在多核 CPU 的情况下,也是如此。
CPython 中:
IO 密集型,由于线程阻塞,就会调度其他线程;
CPU 密集型,当前线程可能会连续的获得 GIL,导致其它线程几乎无法使用 CPU。
在 CPython 中由于有 GIL 存在,IO 密集型,使用多线程;CPU 密集型,使用多进程,绕开 GIL。
新版 CPython 正在努力优化 GIL 的问题,但不是移除。如果非要强调多线程的效率问题,请绕行,选择其它语言 erlang、Go 等。
Python 中绝大多数内置数据结构的读写都是原子操作。
由于 GIL 的存在,Python 的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型的。
保留 GIL 的原因:Guido 坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用 Python。而且移除 GIL,会降低 CPython 单线程的执行效率。
测试下面 2 个程序:
import logging
import datetime
logging.basicConfig(level=logging.INFO, format='%(thread)s %(message)s')
start = datetime.datetime.now()
def calc():
total = 0
for _ in range(1000000000):
total += 1
calc()
calc()
calc()
calc()
calc()
delta = (datetime.datetime.now() - start).total_seconds()
logging.info(delta)
import logging
import datetime
import threading
logging.basicConfig(level=logging.INFO, format='%(thread)s %(message)s')
start = datetime.datetime.now()
def calc():
total = 0
for _ in range(1000000000):
total += 1
t1 = threading.Thread(target=calc)
t2 = threading.Thread(target=calc)
t3 = threading.Thread(target=calc)
t4 = threading.Thread(target=calc)
t5 = threading.Thread(target=calc)
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t1.join()
t2.join()
t3.join()
t4.join()
t5.join()
delta = (datetime.datetime.now() - start).total_seconds()
logging.info(delta)
从两段程序测试的结果来看,CPython 中多线程根本没有任何优势,和一个线程执行时间相当。
因为 GIL 的存在,尤其是像上面的计算密集型程序。