python模块详解 | aiohttp (client)

About

Key Features

客户端 - https://docs.aiohttp.org/en/stable/client.html#aiohttp-client

服务端 - https://docs.aiohttp.org/en/stable/web.html#aiohttp-web

asyncio - /article/2021/5/python-asyncio/

一个简单的示例

import asyncio, aiohttp, time

async def fetch(session,url):
    async with session.get(url, verify_ssl=False) as response:
        return await response.text(), response.status
async def main():
    async with aiohttp.ClientSession() as session:
        text, status_code = await fetch(session, "https://chenxuefan.cn")
        print(f"text:{text[:100]}")
        print(f"code:{status_code}")
        
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
  • 返回的是 coroutine 对象,前面就要加 await
  • response 调用 text() 方法,查询 API 可以发现其返回的是 coroutine 对象;而调用 status 参数不是。
  • with as 语句前面同样需要加 async 来修饰,在 Python 中,with as 语句用于声明一个上下文管理器,能够帮我们自动分配和释放资源,而在异步方法中,with as 前面加上 async 代表声明一个支持异步的上下文管理器。
  • 在 Python 3.7 及以后的版本中,可以使用 asyncio.run( main() ) 来代替最后的启动操作,不需要显式声明事件循环,run 方法内部会自动启动一个事件循环。但为了兼容更多的 Python 版本,依然还是显式声明了事件循环。

Request - 请求

各种各样的请求

session = aiohttp.ClientSession()

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

Params

params = {'key':'value'}
async with session.get('https://chenxuefan.cn/',params=params) as resp:
  	assert str(resp.url) == 'https://chenxuefan.cn/?key=value'

json

json = {'key':'value'}
async with aiohttp.ClientSession() as session:
    await session.post('https://chenxuefan.cn/', json=json)

data

# 写法1
data = {'key':'value'}  
# 写法2
data = {'file': open('report.xls', 'rb')}  
# 写法3
data = aiohttp.FormData()  
data.add_field('file',
               open('report.xls', 'rb'),
               filename='report.xls',
               content_type='application/vnd.ms-excel')

async with aiohttp.ClientSession() as session:
    await session.post('https://chenxuefan.cn/', data=data)

headers

headers={"Authorization": "Basic bG9naW46cGFzcw=="}
async with aiohttp.ClientSession(headers=headers) as session:
  ...

cookies

cookies = {'cookies_are': 'working'}
async with aiohttp.ClientSession(cookies=cookies) as session:
  ...

proxy

对于 aiohttp 来说,我们可以通过 proxy 参数直接设置即可,HTTP 代理设置如下:

import asyncio
import aiohttp

proxy = 'http://127.0.0.1:7890'

async def main():
   async with aiohttp.ClientSession() as session:
       async with session.get('https://httpbin.org/get', proxy=proxy) as response:
           print(await response.text())

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

如果代理有用户名密码,像 requests 一样,把 proxy 修改为如下内容:

proxy = 'http://username:password@127.0.0.1:7890'

这里只需要将 username 和 password 替换即可。

对于 SOCKS 代理,我们需要安装一个支持库,叫作 aiohttp-socks,安装命令如下:

pip3 install aiohttp-socks

可以借助于这个库的 ProxyConnector 来设置 SOCKS 代理,代码如下:

import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector

connector = ProxyConnector.from_url('socks5://127.0.0.1:7891')

async def main():
   async with aiohttp.ClientSession(connector=connector) as session:
       async with session.get('https://httpbin.org/get') as response:
           print(await response.text())

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

运行结果是一样的。

另外这个库还支持设置 SOCKS4、HTTP 代理以及对应的代理认证,可以参考其官方介绍。

Response - 响应

resp = session.get('https://chenxuefan.cn/',verify_ssl=False)
  1. resp.text() - 文本信息

  2. resp.read() - 二进制文本信息

  3. resp.json() - json信息

  4. resp.content.read() - 以字节流的方式读取响应内容

    虽然json(),text(),read()很方便的能把响应的数据读入到内存,但是我们仍然应该谨慎的使用它们,因为它们是把整个的响应体全部读入了内存。即使你只是想下载几个字节大小的文件,但这些方法却将在内存中加载所有的数据。所以我们可以通过控制字节数来控制读入内存的响应内容:

    async with session.get('https://api.github.com/events') as resp:
        await resp.content.read(10) #读取前10个字节
    

    一般地,我们应该使用以下的模式来把读取的字节流保存到文件中:

    with open(filename, 'wb') as fd:
        while True:
            chunk = await resp.content.read(chunk_size)
            if not chunk:
                break
            fd.write(chunk)
    
  5. resp.status - 状态码

  6. resp.method -

  7. resp.url - url,通常加上str()

  8. resp.real_url

  9. resp.content

  10. resp.cookies

  11. resp.headers

  12. resp.raw_headers

  13. resp.links

  14. resp.content_type

  15. resp.charset

  16. resp.content_disposition

  17. resp.history

https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientResponse

Timeout - 超时设置

如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长

timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
    async with session.get(url, timeout=timeout) as resp:
    ...

Concurrency - 并发限制

由于 aiohttp 可以支持非常大的并发,比如上万、十万、百万都是能做到的,但这么大的并发量,目标网站是很可能在短时间内无法响应的,而且很可能瞬时间将目标网站爬挂掉。所以我们需要控制一下爬取的并发量。

在一般情况下,我们可以借助于 asyncio 的 Semaphore[ˈseməfɔː(r)](信号标; 旗语;)来控制并发量,代码示例如下:

import asyncio
import aiohttp

concurrency = 10 # 最大并发量
semaphore = asyncio.Semaphore(concurrency) # 信号量

async def get(session):
  async with semaphore:
    async with session.get("https://chenxuefan.cn/",verify_ssl=False) as resp:
      print(await resp.status)
    
async def main():
  async with aiohttp.ClientSession() as session:
    tasks = [asyncio.ensure_future(main()) for _ in range(1000)]
    asyncio.gather(*tasks)
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())


1206 字