python asyncio模块详解
- Python并发编程之从生成器使用入门协程(七)
- Python并发编程之深入理解yield from语法(八)
- Python并发编程之初识异步IO框架:asyncio 上篇(九)
- Python并发编程之学习异步IO框架:asyncio 中篇(十)
- Python并发编程之实战异步IO框架:asyncio 下篇(十一)
- 廖雪峰的异步IO
- Python-aiohttp百万并发
- 异步爬虫: async/await 与 aiohttp的使用,以及例子
- aiohttp 简易使用教程
- Uvloop 让网络飞一会儿
- Uvloop英文使用文档
- aiohttp官方英文文档
类 | 使用方法 | 解释 | 实现方法 | |
---|---|---|---|---|
可迭代的对象 | collections.abc.Iterable | for循环迭代 | 字符串、列表、字典、deque。可迭代的对象都不是迭代器 | 内部实现 __iter__ 这个魔术方法 |
迭代器 | collections.abc.Iterator | for循环迭代 、next() | 可以不用for循环,而用next()函数迭代的叫迭代器 | 在可迭代的对象基础上再实现的__next__ 方法 |
生成器 | collections.abc.Generator | for循环 、next() 、.send(None) | yield返回值并阻塞等待,节省内存,实现异步编程 | 在迭代器的基础上再实现了yield |
协程 | collections.abc.Coroutine | 协程对象需要注册到事件循环,由事件循环调用 | 函数前面加上async 关键字 |
可迭代对象和迭代器,是将所有的值都生成存放在内存中,而
生成器
则是需要元素才临时生成,节省时间,节省空间
生成器的嵌套
yield from 两种用法
yield from
后面加上可迭代对象
他可以把可迭代对象里的每个元素一个一个的yield出来
yield from
后面加上生成器
(生成器的嵌套)- 只起一个桥梁作用,在调用方与子生成器之间建立一个
双向通道
(通过send()
直接发送消息给子生成器,而子生成器yield的值,也是直接返回给调用方 ) - 子生成器
yield
值直接被调用方接收,子生成器最后return
的值才被父生成器接收 yield from
帮我们做了很多的异常处理,而且全面
- 只起一个桥梁作用,在调用方与子生成器之间建立一个
生成器是协程的基础
- 生成器可以直接使用
@asyncio.coroutine
标记成协程使用 yield from
==await
,都能实现暂停的效果(挂起阻塞的异步调用)
- 生成器可以直接使用
协程
协程是一种编程模型
Python
中协程
的诞生是由于向暂停的生成器发送信息的功能(.send(None)
)进入python2.5后,也就是说生成器是协程的基础协程是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不同位置暂停或开始执行程序。
协程通过使用
yield
暂停生成器,可以将程序的执行流程交给其他的子程序,从而实现不同子程序的之间的交替执行。
asyncio.sleep(n)
,这货是asyncio
自带的工具函数,他可以模拟IO阻塞,他返回的是一个协程对象。Task是Future的子类
将生成器标记成协程
只是把生成器标记成协程,当成协程使用而已。并不是成为协程,本质还是一个生成器。
import asyncio
from collections.abc import Generator, Coroutine
'''
只要在一个生成器函数头部用上 @asyncio.coroutine 装饰器
就能将这个函数对象,【标记】为协程对象。注意这里是【标记】,划重点。
实际上,它的本质还是一个生成器。
标记后,它实际上已经可以当成协程使用。后面会介绍。
'''
@asyncio.coroutine
def hello():
# 异步调用asyncio.sleep(1):
yield from asyncio.sleep(1)
if __name__ == '__main__':
coroutine = hello()
print(isinstance(coroutine, Generator)) # True
print(isinstance(coroutine, Coroutine)) # False
协程完整的工作流程
协程完整的工作流程是这样的
- 定义/创建协程对象
- 将协程转为task任务
- 定义事件循环对象容器
- 将task任务扔进事件循环对象中触发
import asyncio
async def hello(name):
print('Hello,', name)
# 定义协程对象
coroutine = hello("World")
# 定义事件循环对象容器
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
# 将协程转为task任务
task = loop.create_task(coroutine)
# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)
输出结果,当然显而易见
await与yield对比
效果对比
await
和yield
在效果上一样,都能实现暂停的效果。
功能对比
但是在功能上不兼容 (不能在生成器中使用await
,不能在async
定义的协程中使用yield
)
使用对比
yield from
后面可接可迭代对象
,也可接future对象
/协程对象;await
后面必须要接future对象
/协程对象
绑定回调函数
异步IO的实现原理,就是在IO高的地方挂起,等IO结束后,再继续执行。在绝大部分时候,我们后续的代码的执行是需要依赖IO的返回值的,这就要用到回调了。
回调的实现,有两种,一种是绝大部分程序员喜欢的,利用的同步编程实现的回调。 这就要求我们要能够有办法取得协程的await的返回值。
import asyncio
import time
async def _sleep(x):
time.sleep(2)
return '暂停了{}秒!'.format(x)
coroutine = _sleep(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
# task.result() 可以取得返回结果
print('返回结果:{}'.format(task.result()))
还有一种是通过asyncio
自带的添加回调函数功能来实现。
import time
import asyncio
async def _sleep(x):
time.sleep(2)
return '暂停了{}秒!'.format(x)
def callback(future):
print('这里是回调函数,获取返回结果是:', future.result())
coroutine = _sleep(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
# 添加回调函数
task.add_done_callback(callback)
loop.run_until_complete(task)
协程中的并发
每当有任务阻塞的时候就await,然后其他协程继续工作
import asyncio
# 协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x) # 挂起一个阻塞的协程
return 'Done after {}s'.format(x)
# 协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转成task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 协程列表的注册
#loop.run_until_complete(asyncio.gather(*tasks)) # 使用asyncio.gather()注册
for task in tasks:
print('Task ret: ', task.result())
协程中的嵌套
把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程
import asyncio
# 用于内部的协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# 外部的协程函数
async def main():
# 创建三个协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转为task,并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
# 【重点】:await 一个task列表(协程)
# dones:表示已经完成的任务
# pendings:表示未完成的任务
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
# 如果使用的是asyncio.gather(),是这么用的注意这边返回结果,与await不一样
#results = await asyncio.gather(*tasks)
#for result in results:
#print('Task ret: ', result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
gather与wait
接收参数的方式
asyncio.wait
接收的tasks,必须是一个list对象,这个list对象里,存放多个task或协程对象。
asyncio.gather
接收的tasks的时候,前面加*
。可以嵌套也可以直接带入进参数
返回的结果不同
asyncio.wait
asyncio.wait
返回dones
和pendings
dones
:表示已经完成的任务pendings
:表示未完成的任务
如果我们需要获取,运行结果,需要手工去收集获取。
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
asyncio.gather
asyncio.gather
它会把值直接返回给我们,不需要手工去收集。
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
wait有控制功能
import asyncio
import random
async def coro(tag):
await asyncio.sleep(random.uniform(0.5, 5))
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
# 【控制运行任务数】:运行第一个任务就返回
# FIRST_COMPLETED :第一个任务完全返回
# FIRST_EXCEPTION:产生第一个异常返回
# ALL_COMPLETED:所有任务完成返回 (默认选项)
dones, pendings = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
print("第一次完成的任务数:", len(dones))
# 【控制时间】:运行一秒后,就返回
dones2, pendings2 = loop.run_until_complete(
asyncio.wait(pendings, timeout=1))
print("第二次完成的任务数:", len(dones2))
# 【默认】:所有任务完成后返回
dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))
print("第三次完成的任务数:", len(dones3))
loop.close()
输出结果
第一次完成的任务数: 1
第二次完成的任务数: 4
第三次完成的任务数: 5
动态添加协程
如何实现呢,有两种方法:
- 主线程是
同步
的
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
new_loop = asyncio.new_event_loop()
# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
def do_sleep(x, queue, msg=""):
time.sleep(x)
queue.put(msg)
queue = Queue()
print(time.ctime())
# 动态添加两个协程
# 这种方法,在主线程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")
while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())
由于是同步的,所以总共耗时6+3=9
秒
- 主线程是
异步
的,这是重点,一定要掌握。。
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
new_loop = asyncio.new_event_loop()
# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)
queue = Queue()
print(time.ctime())
# 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)
while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())
由于是异步的,所以总共耗时max(6, 3)=6
秒