Python-多线程及生产者与消费者
一、前置知识
1. 队列基础
队列
from queue import Queue # 创建队列 # -- 限制队中最多有 maxsize 个元素 # -- 如果省略参数,默认元素个数无限制 q = Queue(100) q1 = Queue() # 元素入队 q.put(1) q.put(True) q.put('abc') # 队列的大小 print(q.qsize()) # 判断队满 print(q.full()) # 判断队空 print(q.empty()) # 元素出队 # 注意:如果队空,取元素时,会陷入阻塞状态,知道再往队中加入数据为止【***】 while not q.empty(): print(q.get())
2. 多线程
(1) 进程与线程的关系
''' 1. 一个进程可以有多个线程,但是必须有一个主线程 2. 进程之间互不影响,资源不共享 3. 线程之间,资源可以共享,共享的是线程所属的进程的内容 4. 线程必须依赖于进程存在 '''
(2) 创建线程——方式1
from threading import Thread def add(n1, n2): print('结果为:' + n1 + n2) def main(): # 创建一个线程 # -- target 函数的名称 # -- args 以元组的形式,传入函数所需的参数 t = Thread(target=add, args=(1, 2,)) # 开启线程 t.start() if __name__ == '__main__': main()
(3) 创建线程——方式2
''' 1. 通过继承 Thread类 创建线程的步骤 (1) 定义一个类 (2) 继承 Thread类 (3) 重写 run() 方法 (4) 在 run() 方法中写逻辑代码 2. 注意事项 (1) 子类继承 Thread类 后,实例化对象时,会自动执行父类中的 run()方法 所以我们可以重写 run(),然后在 run() 中执行我们自己的代码 (2) 一个子类继承了 Thread类,那么在对线程执行任何其他操作之前 它必须确保已调用基类的构造函数 -- 比如:传参时,需要调用的父类的构造函数 '''
from threading import Thread class MyThread(Thread): # 构造函数 def __init__(self, n1, n2): # 调用父类的构造函数:第一种方法 # threading.Thread.__init__(self) # 调用父类的构造函数:第二种方法 super().__init__() self.n1 = n1 self.n2 = n2 # 重写 run() 方法 def run(self): print('线程的名称:' + self.name) print(self.n1 + self.n2) def main(): # 实例化对象的过程,就是在创建线程 t1 = MyThread(1, 1) # 设置线程的名称 t1.setName('t1') # 开启线程 t1.start() if __name__ == '__main__': main()
(4) 锁的使用
- 一定要保证相关的线程使用的是同一把锁,否则加锁操作无意义
# 加锁之前 # ---------------------------------------------------------- from threading import Thread num = 0 # 声明共享资源 def Jia(): # 标注使用共享的资源 global num # 主逻辑代码 for i in range(10000000): num+=1 print(num) def main(): # 创建线程 t1 = Thread(target=Jia) t2 = Thread(target=Jia) t3 = Thread(target=Jia) # 开启线程 t1.start() t2.start() t3.start() if __name__ == '__main__': main()
# 加锁之后 # ---------------------------------------------------------- from threading import Thread from threading import Lock lock = Lock() # 声明锁,要保证相关的线程使用的是同一把锁 num = 0 # 声明共享资源 def Jia(lock): # 加锁 lock.acquire() # 标注使用共享的资源 global num # 主逻辑代码 for i in range(10000000): num+=1 print(num) # 释放锁 lock.release() def main(): # 创建线程 t1 = Thread(target=Jia, args=(lock,)) t2 = Thread(target=Jia, args=(lock,)) t3 = Thread(target=Jia, args=(lock,)) # 开启线程 t1.start() t2.start() t3.start() if __name__ == '__main__': main()
3. 进阶
(1) Thread.join()
- 作用:
阻塞当前所在的线程
,只有当执行 join() 的线程结束之后,才会解除阻塞 - 分析下面的代码:
- 阻塞前:在主线程中有一句
print('结束了')
,本意是想要在fn
函数执行完之后,再输出结束了
,但是因为主线程和t1线程是同步的,他们在同时执行,所以print('结束了')
的输出位置不一定是最后面,可能是在fn
执行一半的时候就输出结束了
- 阻塞后:
t1线程
调用了join()
,阻塞了当前所在线程,即阻塞了主线程,所以主线程需要等t1线程
结束后才可以继续执行主线程的内容,故实现了print('结束了')
在fn
执行完后在输出内容的需求
- 阻塞前:在主线程中有一句
# 阻塞前:也就是不调用 join() # ---------------------------------------------------------- import time from threading import Thread def fn(): for i in range(10): print(i) time.sleep(1.5) def main(): t1 = Thread(target=fn) t1.start() print('结束了') if __name__ == '__main__': main()
# 阻塞后:调用了 join() # ---------------------------------------------------------- import time from threading import Thread def fn(): for i in range(10): print(i) time.sleep(1.5) def main(): t1 = Thread(target=fn) t1.start() t1.join() print('结束了') if __name__ == '__main__': main()
(2) 守护进程
''' 1. 进程分为主进程、守护进程、非守护进程 2. 守护、非守护是相对于主进程 而言的 3. 守护进程,可以理解为不重要的进程,当主进程结束后,守护进程会强制结束 4. 非守护进程,是比守护进程重要的进程,当主进程结束后,守护进程不会被强制结束 '''
# t1进程是非守护进程:t1进程会陷入死循环 # ---------------------------------------------------------- from threading import Thread def fn(): while True: print(1) def main(): t1 = Thread(target=fn) t1.start() print('结束了') if __name__ == '__main__': main()
# t1进程是守护进程:t1进程会因为主进程的结束,被强制结束 # ---------------------------------------------------------- from threading import Thread def fn(): while True: print(1) def main(): t1 = Thread(target=fn) t1.start() t1.setDaemon(True) # 设置为True时,说明此进程是"守护进程"【默认是False】 print('结束了') if __name__ == '__main__': main()
(3) 队列在线程之间的通信
-
一定要在后面的代码中仔细思考一下,尤其是阶段5的代码
# Queue.join() ''' 当生产者生产结束时,先阻塞生产者线程,只有当消费者发出已经消费完队中产品时,才解除阻塞 ''' # Queue.task_done() ''' 消费者消费一个队中的产品,就向生产者发送一次信息 当消费完队中信息之后,也向生产者发送信息,并发出已经消费完的提示,提示生产者可以解除生产者线程的阻塞了 '''
二、生产者与消费者模式
- 该模式有两个对象,分别是生产者、消费者,两者同时进行操作
- 下面分为5个阶段,慢慢讲解
阶段1:消费者线程的阻塞
from queue import Queue from threading import Thread # 生产者 def produce(q): for i in range(1, 11): q.put(i) print(f'生产产品——{i}') # 消费者 def consumer(q): while True: tmp = q.get() print(f'消费产品——{tmp}') # 主进程 def main(): q = Queue() pro = Thread(target=produce, args=(q,)) con = Thread(target=consumer, args=(q,)) pro.start() con.start() if __name__ == '__main__': main()
- 分析
- 在主线程中创建并开启生产者线程和消费者线程,生产者共生产10个产品
- 生产者生产产品的同时,消费者在调用
q.get()
方法消费产品,当生产者把产品全部生产完之后,生产者线程结束,消费者继续调用q.get()
方法消费产品,当没有产品可以消费时,消费者再调用q.get()
时,会导致消费者线程进入阻塞状态,直到再往里面加数据为止,但是生产者已经把产品生产完,不会再生产了,所以消费者线程会一直处于阻塞状态
-
阶段2:产品消费不完
from queue import Queue from threading import Thread # 生产者 def produce(q): for i in range(1, 11): q.put(i) print(f'生产产品——{i}') # 消费者 def consumer(q): while True: tmp = q.get() print(f'消费产品——{tmp}') # 主进程 def main(): q = Queue() pro = Thread(target=produce, args=(q,)) con = Thread(target=consumer, args=(q,)) con.setDaemon(True) # 设置守护线程 pro.start() con.start() if __name__ == '__main__': main()
- 针对阶段1的代码,只添加了一行代码,将消费者线程为 "守护线程"即可
- 分析
- 当生产者将产品全部生产完,生产者线程结束,然后主线程也结束了,接着
消费者线程
作为守护线程被强制退出,解决了消费者线程阻塞的问题 - 但是,由下图可看到,虽然解决了消费者线程阻塞的问题,但是消费者本次只消费了5个产品,生产者所生产的产品没有被消费完,这个问题请看
阶段3
-
- 当生产者将产品全部生产完,生产者线程结束,然后主线程也结束了,接着
阶段3:小完美的代码
from queue import Queue from threading import Thread # 生产者 def produce(q): for i in range(1, 11): q.put(i) print(f'生产产品——{i}') q.join() # 阻塞生产者线程,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞 # 消费者 def consumer(q): while True: tmp = q.get() print(f'消费产品——{tmp}') q.task_done() # 向生产者发送消息,告诉生产者我已经消费了一个产品 # 主进程 def main(): q = Queue() pro = Thread(target=produce, args=(q,)) con = Thread(target=consumer, args=(q,)) con.setDaemon(True) pro.start() con.start() if __name__ == '__main__': main()
-
针对阶段2仅添加了两行代码
- q.join()
- q.task_done()
-
分析:
- 当生产者将产品全部生产完,生产者线程因为执行了
q.join()
而被阻塞,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞 - 而消费者线程会边消费产品,边执行
q.task_done()
给生产者线程发送消息,直到消费完全部的产品时,在给生产者发送消息时,会通知生产者已经消费完全部的产品 - 此时生产者接收到消费完全部产品的信息,阻塞被解除,生产者线程结束
- 然后主线程结束
- 再接着,由于消费者线程的守护线程,被强制关闭
-
- 当生产者将产品全部生产完,生产者线程因为执行了
阶段4:有关线程执行顺序的问题
from queue import Queue from threading import Thread # 生产者 def produce(q): for i in range(1, 11): q.put(i) print(f'生产产品——{i}') q.join() # 阻塞生产者线程,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞 # 消费者 def consumer(q): while True: tmp = q.get() print(f'消费产品——{tmp}') q.task_done() # 向生产者发送消息,告诉生产者我已经消费了一个产品 # 主进程 def main(): q = Queue() pro = Thread(target=produce, args=(q,)) con = Thread(target=consumer, args=(q,)) con.setDaemon(True) pro.start() con.start() print('结束了') if __name__ == '__main__': main()
-
与阶段3相比,仅在主线程中添加一行输出语句
- 分析
- 我们想要的是两个子线程结束之后,再打印输出
生产者和消费者全部结束了呀!!!
,但是很明显,结果不是这样的,下面开始分析 - 程序中有1个主线程、2个子线程,三者会同时执行,所以主线程中的输出语句的执行时间是随机的,故输出的位置也是随机的
- 解决方法:阻塞当前线程,也就是阻塞主线程,见阶段5
-
- 我们想要的是两个子线程结束之后,再打印输出
阶段5:线程执行顺序问题的解决
from queue import Queue from threading import Thread # 生产者 def produce(q): for i in range(1, 11): q.put(i) print(f'生产产品——{i}') q.join() # 阻塞生产者线程,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞 # 消费者 def consumer(q): while True: tmp = q.get() print(f'消费产品——{tmp}') q.task_done() # 向生产者发送消息,告诉生产者我已经消费了一个产品 # 主进程 def main(): q = Queue() pro = Thread(target=produce, args=(q,)) con = Thread(target=consumer, args=(q,)) con.setDaemon(True) pro.start() con.start() pro.join() # 阻塞当前所在的线程 print('结束了') if __name__ == '__main__': main()
-
与阶段4相比,仅添加一句代码,以达到阻塞主线程的需求
- 分析:
- 程序中有1个主线程、2个子线程,三者会同时执行
- 主线程中执行到
pro.join()
时,当前线程被阻塞,也即主线程被阻塞,知道生产完全部产品,消费完全部产品,生产者线程结束 - 主线程才被解除阻塞
- 然后主线程结束,消费者线程被强制结束
-
三、参考
「其他文章」
- 模板化的封装,降低业务代码开发
- 分享一个 SpringCloud Feign 中所埋藏的坑
- MySQL 事务常见面试题总结 | JavaGuide 审核中
- 类型安全的 Go HTTP 请求
- 从几次事故引起的对项目质量保障的思考
- 联盟链 Hyperledger Fabric 应用场景
- 上半年最中意的 GitHub 更新「GitHub 热点速览 v.22.21」
- 为什么我写了路由懒加载但代码却没有分割?
- 这个设计原则,你认同吗?
- SpringCloud基础概念学习笔记(Eureka、Ribbon、Feign、Zuul)
- 自动微分原理
- layui数据表格搜索
- Python 中的内存管理
- spring 配置文件 --bean
- 【leetcode】239. 滑动窗口最大值
- Spring 源码(17)Spring Bean的创建过程(8)Bean的初始化
- SpringBoot进阶教程(七十四)整合ELK
- 链表的基本操作和高频算法题
- 【python】python连接Oracle数据库
- Python技法:浮点数取整、格式化和NaN处理