python模块详解 | asyncio
why async?
堵塞
阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续处理其他的事情,则称该程序在该操作上是阻塞的。
当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等待,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。这就是异步操作的便捷之处。
几个相关的概念:
- 线程(thread)
- 进程(process)
- 协程(coroutine)
- 并行(parallel)
- 并发(concurrency)
- 异步(async)
协程
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程。协程的本质是个单线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
(协程不是计算机提供,而是程序员人为创造。)
优点
-
方便切换控制流,简化编程模型
-
无需线程上下文切换的开销
计算密集型的操作,利用多线程来回切换执行,没有任何意义,来回切换并保存状态,反倒会降低性能。
-
高并发+高扩展性+低成本
一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
-
无需原子操作锁定及同步的开销
“原子操作(atomic operation)是不需要synchronized”,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
-
总结下来,协程在线程基础上全面优化;在网络请求密集场景下尤为适用。
缺点
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。比如执行 命令行任务 时导致的堵塞。
- 总结下来,协程作用不大的场景,使用多进程即可解决。
实现协程的方法
- greenlet,早期模块。
yield
关键字。asyncio.coroutine
装饰器(py3.4提供,至py3.8被移除)asyncio
、async
、await
关键字(py3.5)【推荐】
why asyncio?
asyncio
官方文档 - https://docs.python.org/zh-cn/3/library/asyncio-task.html
从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。
Python 中使用协程最常用的库莫过于 asyncio,首先我们需要了解下面几个概念:
-
event_loop
:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。 -
coroutine
:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。 -
task
:任务,它是对协程对象的进一步封装,包含了任务的各个状态。 -
future
:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。
async & await
async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。
Python3.8之后 @asyncio.coroutine
装饰器就会被移除,推荐使用async & awit 关键字实现协程代码。
await + 可等待的对象(协程对象、Future对象、Task对象 -> IO等待)
使用async
定义的方法会变成一个无法直接执行的「coroutine对象」,必须将其注册到事件循环中,或使用asyncio.run()
,才可以执行。
要实现异步,需要了解一下 await 的用法,使用 await 可以「将耗时等待的操作挂起,让出控制权」。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。
更多参考 - /article/2021/2/async/
Coroutine / Task / Future
如果一个对象可以在
await
语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。可等待 对象有三种主要类型: 协程, 任务 和 Future.
———— 官方文档
Coroutine 对象
# 1. 创建协程函数
async def main():
...
# 2. 创建协程对象
coroutine = main()
# 3. 运行协程对象
await coroutine # 协程函数内部
asyncio.run(coroutine) # 协程函数外部
asyncio.get_event_loop().run_until_complete(coroutine) # 协程外部
Task 对象
任务 被用来“并行的”调度协程
当一个协程通过
asyncio.create_task()
等函数被封装为一个 任务,该协程会被自动调度执行———— 官方文档
白话:在事件循环中添加多个任务的。
1. 创建task
-
asyncio.create_task(coro, *, name=None)
/asyncio.ensure_future()
create_task()
方法在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的方法 -ensure_future()
。create_task()
方法,name 不为None
,它将使用Task.set_name()
来设为任务的名称。create_task()
方法会在get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发RuntimeError
。
-
loop.create_task()
/loop.ensure_future()
task用于并发调度协程,创建Task对象,可以让协程加入事件循环中等待被调度执行。不建议手动实例化 Task 对象。
⚠️注意:create_task()
方法需要在协程函数内使用,否则会引发 RuntimeError
。
2. 收集tasks任务结果
-
asyncio.wait( tasks )
done, pending = asyncio.wait( tasks ) for task in done: t_name = task.get_name() t_result = task.result()
- done - 已完成的协程
Task
,set类型 - pending - 超时未完成的协程
Task
,set类型
- done - 已完成的协程
-
asyncio.gather( *tasks )
res = asyncio.gather(task1, task2) # 或 res = asyncio.gather(*tasks) print(res) # res: list
3. 执行task
-
asyncio.get_event_loop().run_until_complete()
- 版本3.7之前 -
asyncio.run(*coro*, ***, *debug=False*)
- 版本3.7之后执行 coroutine coro 并返回结果。
此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。
当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
如果 debug 为
True
,事件循环将以调试模式运行。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
4. task & tasks
-
task
# 1. 创建task task = coroutine # coroutine:协程对象 task = asyncio.create_task(coroutine) # py3.7之后 task = asyncio.ensure_future(coroutine) # py3.7之前 task = loop.create_task(coroutine) # loop:事件循环 # 2. 运行task # 2.1(在协程函数内部) await task # 2.2(在协程函数外部) loop = asyncio.get_event_loop() loop.run_until_complete(task) # 2.3(在协程函数外部) asyncio.run(task)
-
tasks
# 1. 创建tasks tasks = [task1, task2] # 2. 运行tasks # 2.1(在协程函数内部) await asyncio.wait(tasks) # 2.2(在协程函数外部) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # 2.3(在协程函数外部) tasks = [coroutine1, coroutine2] asyncio.run( asyncio.wait(tasks) )
Future 对象
Future
是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
———— 官方文档
Task继承Future,Task对象内部await结果的处理基于Future对象来的。
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不干。
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果则会一直等下去。
await fut
asyncio.run( main() )
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
fut = loop.create_future()
# 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
# 即手动设置future任务的最终结果,那么fut就可以结束了。
await loop.create_task( set_after(fut) )
# 等待 Future对象获取 最终结果,否则一直等下去
data = await fut
print(data)
asyncio.run( main() )
# 1. 创建future
asyncio.get_running_loop().create_future()
基于线程池和进程池的异步
使用场景:
- 使用异步编程时,遇到某个第三方模块不支持基于协程的异步。
- 在使用协程的基础上,还需要更快的执行效率。
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func():
...
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
pool = ProcessPoolExecutor(max_workers=5)
异步迭代器
什么是异步迭代器?
实现了 __aiter__()
和 __anext__()
方法的对象。__anext__
必须返回一个 awaitable 对象。async for
会处理异步迭代器的 __anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492 引入。
什么是异步可迭代对象?
可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator。由 PEP 492 引入。
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def __aenter__(self):
# 例:异步链接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# 例:异步关闭数据库链接
await asyncio.sleep(1)
async def do_something(self):
# 例:异步操作数据库
return 666
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run( func() )
异步上下文管理器
此种对象通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。由 PEP 492 引入。
import asyncio
class Reader(object):
""" 自定义异步迭代器(同时也是异步可迭代对象) """
def __init__(self):
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
async def func():
async for item in Reader():
print(item)
asyncio.run( func() )
uvloop
uvloop是asyncio的事件循环的替代方案。比默认asyncio的事件循环性能要高许多。
pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码,与之前写的代码一致。
# 内部的事件循环自动化会变为uvloop
asyncio.run(...)
注:asgi -> uvicorn
内部使用的就是uvloop
休眠
coroutine asyncio.sleep
(delay, result=None, ***, loop=None)
- 阻塞 delay 指定的秒数。
- 如果指定了 result,则当协程完成时将其返回给调用者。
sleep()
总是会挂起当前任务,以允许其他任务运行。
Talk is cheap,show me the code.
-
多任务协程
import asyncio import requests import aiohttp import time async def get(url): session = aiohttp.ClientSession() response = await session.get(url) await response.text() await session.close() return response async def request(): url = "https://baidu.com" r = await get(url) return r.status_code start = time.time() tasks = [asyncio.ensure_future(request()) for _ in range(5)] # loop = asyncio.get_event_loop() # loop.run_until_complete(asyncio.wait(tasks)) # end = time.time() # https://www.huodongxing.com/events print(end-start)
- 使用
await
可以将「耗时等待的操作」挂起,让出控制权。当协程执行的时候遇到await
,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕,再进行下一个协程的执行 asyncio.ensure_future()
- 创建任务,并将多个任务列为列表- 将任务列表传递给
asyncio.wait()
方法(多个任务时才需要这个方法) loop.run_until_complete()
- 事件循环loop接收任务,并启动
- 使用
-
测试一下速度
import asyncio import aiohttp import time start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) await response.text() await session.close() return response async def request(): url = 'https://static4.scrape.center/' print('Waiting for', url) response = await get(url) print('Get response from', url, 'response', response) tasks = [asyncio.ensure_future(request()) for _ in range(10)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start)
-
再来一个万能架构(loop版本)
import asyncio import aiohttp async def fetch(session, url) -> str: async with session.get(url) as resp: return await resp.text() async def parse(text) -> str: return text async def process(session, url): return await parse(await fetch(session, url)) async def main(): async with aiohttp.ClientSession() as session: urls = ['','',''] tasks = [asyncio.ensure_future(process(session, url)) for url in urls] return asyncio.wait(tasks) loop = asyncio.get_event_loop() loop.run_until_complete(main())
-
再来一个万能架构(asyncio.run()版本)
import asyncio import aiohttp async def fetch(url) -> str: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text() async def parse(text) -> str: return text async def process(url): return await parse(await fetch(url)) async def main(): urls = ['', '', ''] tasks = [asyncio.ensure_future(spider(url)) for url in urls] return await asyncio.wait(tasks) done, pending = asyncio.run(main()) # 此方法由 python3.7 提供