python 多线程
2022年5月24日大约 6 分钟约 1892 字
多线程
threading python官方教程concurrent.futures python官方教程multiprocessing.dummy python官方教程
- 对于io操作来说,多线程和多进程性能差别不大,甚至多线程性能更好(瓶颈在io不在CPU)
- 对操作系统来说,线程调度比进程调度更加轻量,成本更低。
GIL(全局解释器锁,global interpreter lock)
cpython会在多线程编程时,在全局加一把锁。只允许同一时刻只有一个线程在一个cpu上执行字节码,无法将多个线程映射到多个CPU上执行。
GIL会根据执行的字节码行数以及时间片自动释放;GIL还会在遇到io操作的时候主动释放。
字节码查看命令
import dis
def add(a):
a = a+1
return a
print(dis.dis(add))
使用面向对象编程(建议主要使用)
# coding:utf-8
import time
import threading
class GetDetailHtmlThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
# 当线程start时执行的逻辑
def run(self):
print("开始获取网站内容")
time.sleep(2)
print("已获取网站内容")
class GetDetailUrlThread(threading.Thread):
def run(self):
print("开始爬取URL")
time.sleep(2)
print("爬取URL结束")
if __name__ == "__main__":
thread1 = GetDetailHtmlThread("get_detail_html")
thread2 = GetDetailUrlThread()
start_time = time.time() # 开始的时间
thread1.setDaemon(True) # 设置成守护进程,当主线程退出时该线程立即强制退出(默认是所有线程运行完,程序才结束)
thread2.setDaemon(True)
thread1.start() # 开始运行该子线程
thread2.start() # 开始运行该子线程
thread1.join() # 将该子线程加入此位置阻塞,等待该子线程执行结束。
thread2.join()
print("最后时间: {}".format(time.time() - start_time))
使用函数式编程
# coding:utf-8
import time
import threading
def get_detail_html(url):
print("开始获取网站内容")
time.sleep(2)
print("已获取网站内容")
def get_detail_url(url):
print("开始爬取URL")
time.sleep(2)
print("爬取URL结束")
if __name__ == "__main__":
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time() # 开始的时间
thread1.setDaemon(True) # 设置成守护进程,当主线程退出时该线程立即强制退出(默认是所有线程运行完,程序才结束)
thread2.setDaemon(True)
thread1.start() # 开始运行该子线程
thread2.start() # 开始运行该子线程
thread1.join() # 将该子线程加入此位置阻塞,等待该子线程执行结束。
thread2.join()
print("最后时间: {}".format(time.time() - start_time))
线程间通信(全局变量、Queue、线程锁、线程池)
ThreadPoolExecutor线程池
简单示例:
# coding:utf-8
from concurrent import futures # 用于线程池和进程池编程,是个顶层包,该包使线程和进程编码接口一致
import time
# 之前用semaphore锁来控制子线程数量,现在可以用线程池简化操作
def get_html(times):
time.sleep(times)
return times
executor = futures.ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
# 批量提交任务
all_task = [executor.submit(get_html, (url)) for url in urls]
# futures.wait(all_task) # 阻塞,等待任务完成
# futures.wait(all_task, return_when=futures.FIRST_COMPLETED) # 阻塞,只等待有其中一个任务完成就往下进行
# --------批量获取已经成功的任务的结果--------
# 方法一:
for data in executor.map(get_html, urls):
print("获取{}页面成功".format(data)) # 数据的返回顺序和提供的urls列表顺序是一致的
# 方法二:(推荐使用)
for future in futures.as_completed(all_task):
data = future.result() # 谁先完成就处理谁
print("获取{}页面成功".format(data))
# --------单个任务的提交和操作---------------
# 通过submit函数提交任务
# task1 = executor.submit(get_html, (3)) # 提交到池后如果任务没有达到最大数量则子线程立即开始启动。非阻塞,立即返回
# task2 = executor.submit(get_html, (2))
# print(task1.done()) # 判断任务是否执行完成
# print(task2.cancel()) # 取消任务。只有任务没有被启动才可以被取消。
# print(task1.result()) # 获取子线程的返回结果,阻塞的方法
使用Queue通信
Queue使用方法
#coding: utf-8
import queue
url_queue = queue.Queue(maxsize=1000) # Queue本身就是线程安全的,无需加锁。
url_queue.get() # 阻塞的方法,如果队列为空会等待数据
url_queue.put(data) # 阻塞的方法,如果队列已满会等待
url_queue.put_nowait() # 异步的方法,不等待执行完成就返回
url_queue.get_nowait()
url_queue.qsize() # 获取队列长度
url_queue.empty() # 判断队列是否为空
url_queue.full() # 判断队列是否已满
url_queue.join() # 将queue加入到该位置阻塞,等待任务队列完成
url_queue.task_done() # queue任务完成
Lock锁、RLock锁、Condition条件锁、Semaphore锁
# --------------Lock和RLock区别
# 在同一个线程里面,RLock可以连续调用多次acquire,一定注意acquire次数和release次数需相等;不同线程还是只能请求一次acquire。
# ---------------Condition条件锁
# 当两个子线程需要相互通知再执行时可以使用此锁。
# Condition实现了__enter__和__exit__方法,说明可以用with语句调用
with condition:
condition.notify() # 通知
condition.wait() # 等待通知,这两个方法必须在with语句下面调用。
# condition的with语句已经自动的实现了acquire()和release(),也可以去掉with手动在头部和尾部加上condition.acquire()和condition.release()是一样的。
# condition有两层锁,一层with语句condition全局锁,一层wait锁;
# 调用wait方法会释放wait锁,并分配一个wait锁放入condition一个双端队列中等待notify方法释放唤醒。
# 调用notify方法会从双端队列弹出锁并释放,让另个执行了wait操作的子线程可以继续执行。
# 实验结果:同一个线程下使用notify方法并不能解锁当前wait方法,只能解锁另一个线程的wait方法。
# 使用condition时候,子线程启动顺序重要,应该首先让wait语句在前面的线程先start启动并执行到wait方法等待,来等待另一个子线程的notify通知。
# -------------Semaphore锁
# 是用于控制进入数量的锁
# semaphore = threading.Semaphore(3) # 实例化一个数量为3的锁
# semaphore.accquire() # 将数量减一,如果数量用完就阻塞等待锁
# semaphore.release() # 将数量加一
使用Semaphore控制子线程启动的数量:
#做爬虫
import threading
import time
class HtmlSpider(threading.Thread):
"""
获取URL的实际内容
"""
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success")
self.sem.release() # 实际业务操作完成,子进程结束前把semaphore锁加回来。
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire() # 请求一把锁,来生成一个html下载器子线程
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(3)
url_producer = UrlProducer(sem)
url_producer.start()
使用全局变量通信
全局变量可以直接在线程间通信,直接使用即可(注意加锁),以下是示例
# vim share_variables.py
detail_url_list = []
# vim python_thread.py
# coding:utf-8
import time
import threading
from chapter import share_variables # 不可以直接把变量import进来,否则线程对该变量的修改看不到!
# detail_url_list = [] # 使用全局变量做为共享变量,也可以放到其他配置文件里面方便管理
url_lock = threading.Lock() # url列表的锁,用锁会影响性能
class GetDetailHtmlThread(threading.Thread):
def __init__(self, detail_url_list): # list和dict对象传进来的是引用。
super().__init__()
self.detail_url_list = detail_url_list
# 当线程start时执行的逻辑
def run(self):
global url_lock
while True:
if len(self.detail_url_list):
url_lock.acquire() # 请求获得锁
url = self.detail_url_list.pop()
url_lock.release() # 释放锁
print("开始获取网站内容")
time.sleep(2)
print("已获取网站内容")
class GetDetailUrlThread(threading.Thread):
def run(self):
# global detail_url_list
detail_url_list = share_variables.detail_url_list
global url_lock
while True:
print("开始爬取列表URL")
time.sleep(4)
url_lock.acquire() # 请求获得锁
for i in range(10):
detail_url_list.append("https://www.baidu.com/{id}".format(id=i))
url_lock.release() # 释放锁
print("爬取URL结束")
if __name__ == "__main__":
detail_url_thread = GetDetailUrlThread()
detail_url_thread.start()
for i in range(10): # 下载器比爬取器慢,所以要创建10个下载器线程来匹配
html_thread = GetDetailHtmlThread(share_variables.detail_url_list)
html_thread.start()