python模块详解 | asyncio

why async?

堵塞

阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续处理其他的事情,则称该程序在该操作上是阻塞的。

当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等待,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。这就是异步操作的便捷之处。

几个相关的概念:

  • 线程(thread)
  • 进程(process)
  • 协程(coroutine)
  • 并行(parallel)
  • 并发(concurrency)
  • 异步(async)

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程。协程的本质是个单线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

(协程不是计算机提供,而是程序员人为创造。)

优点

  • 方便切换控制流,简化编程模型

  • 无需线程上下文切换的开销

    计算密集型的操作,利用多线程来回切换执行,没有任何意义,来回切换并保存状态,反倒会降低性能。

  • 高并发+高扩展性+低成本

    一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

  • 无需原子操作锁定及同步的开销

    “原子操作(atomic operation)是不需要synchronized”,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

  • 总结下来,协程在线程基础上全面优化;在网络请求密集场景下尤为适用。

缺点

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。比如执行 命令行任务 时导致的堵塞。
  • 总结下来,协程作用不大的场景,使用多进程即可解决。

实现协程的方法

  • greenlet,早期模块。
  • yield关键字。
  • asyncio.coroutine装饰器(py3.4提供,至py3.8被移除)
  • asyncioasyncawait关键字(py3.5)【推荐】

why asyncio?

asyncio

官方文档 - https://docs.python.org/zh-cn/3/library/asyncio-task.html

从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。

Python 中使用协程最常用的库莫过于 asyncio,首先我们需要了解下面几个概念:

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。

  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。

  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

async & await

async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

Python3.8之后 @asyncio.coroutine 装饰器就会被移除,推荐使用async & awit 关键字实现协程代码。

await + 可等待的对象(协程对象、Future对象、Task对象 -> IO等待)

使用async定义的方法会变成一个无法直接执行的「coroutine对象」,必须将其注册到事件循环中,或使用asyncio.run(),才可以执行。

要实现异步,需要了解一下 await 的用法,使用 await 可以「将耗时等待的操作挂起,让出控制权」。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。

更多参考 - /article/2021/2/async/

Coroutine / Task / Future

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程, 任务Future.

———— 官方文档

Coroutine 对象

# 1. 创建协程函数
async def main():
  ...
  
# 2. 创建协程对象
coroutine = main()

# 3. 运行协程对象
await coroutine  # 协程函数内部
asyncio.run(coroutine)  # 协程函数外部
asyncio.get_event_loop().run_until_complete(coroutine)  # 协程外部

Task 对象

任务 被用来“并行的”调度协程

当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行

———— 官方文档

白话:在事件循环中添加多个任务的。

1. 创建task

  • asyncio.create_task(coro, *, name=None) / asyncio.ensure_future()

    • create_task() 方法在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的方法 - ensure_future()
    • create_task() 方法,name 不为 None,它将使用 Task.set_name() 来设为任务的名称。
    • create_task() 方法会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError
  • loop.create_task() / loop.ensure_future()

task用于并发调度协程,创建Task对象,可以让协程加入事件循环中等待被调度执行。不建议手动实例化 Task 对象。

⚠️注意:create_task()方法需要在协程函数内使用,否则会引发 RuntimeError

2. 收集tasks任务结果

  • asyncio.wait( tasks )

    done, pending = asyncio.wait( tasks )  
    for task in done:
      t_name = task.get_name() 
      t_result = task.result()
    
    • done - 已完成的协程 Task,set类型
    • pending - 超时未完成的协程 Task,set类型
  • asyncio.gather( *tasks )

    res = asyncio.gather(task1, task2)
    # 或
    res = asyncio.gather(*tasks)
    print(res)  # res: list
    

3. 执行task

  • asyncio.get_event_loop().run_until_complete() - 版本3.7之前

  • asyncio.run(*coro*, ***, *debug=False*) - 版本3.7之后

    执行 coroutine coro 并返回结果。

    此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。

    当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

    如果 debugTrue,事件循环将以调试模式运行。

    此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

4. task & tasks

  • task

    # 1. 创建task
    task = coroutine # coroutine:协程对象
    task = asyncio.create_task(coroutine) # py3.7之后
    task = asyncio.ensure_future(coroutine) # py3.7之前
    task = loop.create_task(coroutine) # loop:事件循环
    
    # 2. 运行task
    # 2.1(在协程函数内部)
    await task
    # 2.2(在协程函数外部)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    # 2.3(在协程函数外部)
    asyncio.run(task)
    
  • tasks

    # 1. 创建tasks
    tasks = [task1, task2]
    
    # 2. 运行tasks
    # 2.1(在协程函数内部)
    await asyncio.wait(tasks)
    # 2.2(在协程函数外部)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # 2.3(在协程函数外部)
    tasks = [coroutine1, coroutine2]
    asyncio.run( asyncio.wait(tasks) )
    

Future 对象

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

———— 官方文档

Task继承Future,Task对象内部await结果的处理基于Future对象来的。

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 创建一个任务(Future对象),这个任务什么都不干。
    fut = loop.create_future()

    # 等待任务最终结果(Future对象),没有结果则会一直等下去。
    await fut

asyncio.run( main() )
import asyncio


async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")


async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()

    # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
    fut = loop.create_future()

    # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
    # 即手动设置future任务的最终结果,那么fut就可以结束了。
    await loop.create_task(  set_after(fut) )

    # 等待 Future对象获取 最终结果,否则一直等下去
    data = await fut
    print(data)

asyncio.run( main() )
# 1. 创建future
asyncio.get_running_loop().create_future()

基于线程池和进程池的异步

使用场景:

  1. 使用异步编程时,遇到某个第三方模块不支持基于协程的异步。
  2. 在使用协程的基础上,还需要更快的执行效率。
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def func():
  ...
  
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
pool = ProcessPoolExecutor(max_workers=5)

异步迭代器

什么是异步迭代器?

实现了 __aiter__()__anext__() 方法的对象。__anext__ 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

什么是异步可迭代对象?

可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。

import asyncio

class AsyncContextManager:
    def __init__(self):
        self.conn = conn
  
    async def __aenter__(self):
        # 例:异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        # 例:异步关闭数据库链接
				await asyncio.sleep(1)
        
    async def do_something(self):
        # 例:异步操作数据库
        return 666

async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)

asyncio.run( func() )

异步上下文管理器

此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。

import asyncio

class Reader(object):
    """ 自定义异步迭代器(同时也是异步可迭代对象) """

    def __init__(self):
        self.count = 0
        
    def __aiter__(self):
          return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val
      
    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count
    
async def func():
    async for item in Reader():
        print(item)
        
asyncio.run( func() )

uvloop

uvloop是asyncio的事件循环的替代方案。比默认asyncio的事件循环性能要高许多。

pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 编写asyncio的代码,与之前写的代码一致。	

# 内部的事件循环自动化会变为uvloop
asyncio.run(...)

注:asgi -> uvicorn 内部使用的就是uvloop

休眠

coroutine asyncio.sleep(delay, result=None, ***, loop=None)

  • 阻塞 delay 指定的秒数。
  • 如果指定了 result,则当协程完成时将其返回给调用者。
  • sleep() 总是会挂起当前任务,以允许其他任务运行。

Talk is cheap,show me the code.

  • 多任务协程

    import asyncio
    import requests
    import aiohttp
    import time
    
    async def get(url):
      session = aiohttp.ClientSession()
      response = await session.get(url)
      await response.text()
      await session.close()
      return response
    
    async def request():
      url = "https://baidu.com"
      r = await get(url)
      return r.status_code
    
    start = time.time()
    
    tasks = [asyncio.ensure_future(request()) for _ in range(5)] # 
    loop = asyncio.get_event_loop() # 
    loop.run_until_complete(asyncio.wait(tasks)) #
    
    end = time.time()
    # https://www.huodongxing.com/events
    print(end-start)
    
    1. 使用 await 可以将「耗时等待的操作」挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕,再进行下一个协程的执行
    2. asyncio.ensure_future() - 创建任务,并将多个任务列为列表
    3. 将任务列表传递给asyncio.wait()方法(多个任务时才需要这个方法)
    4. loop.run_until_complete() - 事件循环loop接收任务,并启动
  • 测试一下速度

    import asyncio
    import aiohttp
    import time
    
    start = time.time()
    
    async def get(url):
       session = aiohttp.ClientSession()
       response = await session.get(url)
       await response.text()
       await session.close()
       return response
    
    async def request():
       url = 'https://static4.scrape.center/'
       print('Waiting for', url)
       response = await get(url)
       print('Get response from', url, 'response', response)
    
    tasks = [asyncio.ensure_future(request()) for _ in range(10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('Cost time:', end - start)
    
  • 再来一个万能架构(loop版本)

    import asyncio
    import aiohttp
    
    async def fetch(session, url) -> str:
        async with session.get(url) as resp:
           return await resp.text()
    
    async def parse(text) -> str:
        return text
    
    async def process(session, url):
        return await parse(await fetch(session, url))
    
    async def main():
      	async with aiohttp.ClientSession() as session:
        	urls  = ['','','']
        	tasks = [asyncio.ensure_future(process(session, url)) for url in urls]
        	return asyncio.wait(tasks)
    
    loop  = asyncio.get_event_loop()
    loop.run_until_complete(main())
    
  • 再来一个万能架构(asyncio.run()版本)

    import asyncio
    import aiohttp
    
    async def fetch(url) -> str:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()
    
    async def parse(text) -> str:
        return text
    
    async def process(url):
        return await parse(await fetch(url))
    
    async def main():
    		urls  = ['', '', '']
    		tasks = [asyncio.ensure_future(spider(url)) for url in urls]
    		return await asyncio.wait(tasks)
    
    done, pending = asyncio.run(main())  # 此方法由 python3.7 提供 
    

一起来写协程吧,强势提高生产力!


4215 字