云计算百科
云计算领域专业知识百科平台

Python aiohttp 全面指南:异步HTTP客户端/服务器框架

边写代码零食不停口 盼盼麦香鸡味块 、卡乐比(Calbee)薯条三兄弟 独立小包、好时kisses多口味巧克力糖、老金磨方【黑金系列】黑芝麻丸

边写代码边贴面膜 事业美丽两不误 DR. YS 野森博士+【AOUFSE/澳芙雪特证】377专研美白淡斑面膜组合 优惠劵

别光顾写代码更要多喝茶水,提神有营养 六安瓜片茶叶茶香二级200g 2025年新茶雨前盒装自己喝

让AI成为我们的得力助手:《用Cursor玩转AI辅助编程——不写代码也能做软件开发》

Python 图书推荐

书名出版社推荐
Python编程 从入门到实践 第3版(图灵出品) 人民邮电出版社 ★★★★★
Python数据科学手册(第2版)(图灵出品) 人民邮电出版社 ★★★★★
图形引擎开发入门:基于Python语言 电子工业出版社 ★★★★★
科研论文配图绘制指南 基于Python(异步图书出品) 人民邮电出版社 ★★★★★
Effective Python:编写好Python的90个有效方法(第2版 英文版) 人民邮电出版社 ★★★★★
Python人工智能与机器学习(套装全5册) 清华大学出版社 ★★★★★

JAVA 图书推荐

书名出版社推荐
Java核心技术 第12版:卷Ⅰ+卷Ⅱ 机械工业出版社 ★★★★★
Java核心技术 第11版 套装共2册 机械工业出版社 ★★★★★
Java语言程序设计基础篇+进阶篇 原书第12版 套装共2册 机械工业出版社 ★★★★★
Java 11官方参考手册(第11版) 清华大学出版社 ★★★★★
Offer来了:Java面试核心知识点精讲(第2版)(博文视点出品) 电子工业出版社 ★★★★★

什么是 aiohttp?

aiohttp 是一个基于 Python asyncio 的异步 HTTP 客户端/服务器框架,专为高性能网络编程设计。它提供了:

  • 异步 HTTP 客户端(类似异步版 requests)
  • 异步 HTTP 服务器(类似异步版 Flask/Django)
  • 完整的 WebSocket 支持
  • 高效的连接池管理
  • 核心优势

    特性描述
    异步非阻塞 单线程处理数千并发连接
    高性能 远超同步框架(如 requests)的吞吐量
    轻量级 简洁的API,无复杂依赖
    全面协议支持 HTTP/1.1, HTTP/2(客户端), WebSocket
    生态完善 良好文档和活跃社区

    基础用法 – HTTP客户端

    安装

    pip install aiohttp

    基本GET请求

    import aiohttp
    import asyncio

    async def main():
    async with aiohttp.ClientSession() as session:
    async with session.get('https://api.example.com/data') as response:
    print("状态码:", response.status)
    print("响应内容:", await response.text())

    asyncio.run(main())

    POST请求示例

    async def post_example():
    async with aiohttp.ClientSession() as session:
    # 表单数据
    async with session.post(
    'https://httpbin.org/post',
    data={'key': 'value'}
    ) as response:
    print(await response.json())

    # JSON数据
    async with session.post(
    'https://api.example.com/users',
    json={'name': 'Alice', 'age': 30}
    ) as response:
    print(await response.json())

    高级用法

    并发请求

    async def fetch(url):
    async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
    return await response.text()

    async def concurrent_requests():
    urls = [
    'https://api.example.com/item/1',
    'https://api.example.com/item/2',
    'https://api.example.com/item/3'
    ]

    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)

    for url, content in zip(urls, results):
    print(f"{url}: {content[:50]}…")

    asyncio.run(concurrent_requests())

    超时控制

    async def timeout_example():
    timeout = aiohttp.ClientTimeout(total=5) # 5秒总超时

    async with aiohttp.ClientSession(timeout=timeout) as session:
    try:
    async with session.get('https://slow-api.example.com') as response:
    return await response.text()
    except asyncio.TimeoutError:
    print("请求超时!")

    流式处理大响应

    async def stream_response():
    async with aiohttp.ClientSession() as session:
    async with session.get('https://large-file.example.com') as response:
    with open('large_file.txt', 'wb') as f:
    async for chunk in response.content.iter_chunked(1024):
    f.write(chunk)
    print(f"已接收 {len(chunk)} 字节")

    服务器端开发

    基本HTTP服务器

    from aiohttp import web

    async def handle(request):
    name = request.match_info.get('name', "World")
    return web.Response(text=f"Hello, {name}!")

    app = web.Application()
    app.add_routes([
    web.get('/', handle),
    web.get('/{name}', handle)
    ])

    if __name__ == '__main__':
    web.run_app(app, port=8080)

    REST API示例

    async def get_users(request):
    users = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]
    return web.json_response(users)

    async def create_user(request):
    data = await request.json()
    # 实际应用中这里会保存到数据库
    return web.json_response({'id': 3, **data}, status=201)

    app = web.Application()
    app.add_routes([
    web.get('/api/users', get_users),
    web.post('/api/users', create_user)
    ])

    WebSocket服务器

    async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
    if msg.type == aiohttp.WSMsgType.TEXT:
    if msg.data == 'close':
    await ws.close()
    else:
    await ws.send_str(f"ECHO: {msg.data}")
    elif msg.type == aiohttp.WSMsgType.ERROR:
    print('WebSocket连接异常关闭')

    return ws

    app.add_routes([web.get('/ws', websocket_handler)])

    进阶扩展

    中间件示例

    async def auth_middleware(app, handler):
    async def middleware(request):
    # 验证API密钥
    if request.headers.get('X-API-Key') != 'SECRET_KEY':
    return web.json_response({'error': 'Unauthorized'}, status=401)
    return await handler(request)
    return middleware

    app = web.Application(middlewares=[auth_middleware])

    HTTP/2客户端支持

    async def http2_request():
    conn = aiohttp.TCPConnector(force_close=True, enable_cleanup_closed=True)
    async with aiohttp.ClientSession(connector=conn) as session:
    async with session.get(
    'https://http2.akamai.com/',
    headers={'accept': 'text/html'}
    ) as response:
    print("HTTP版本:", response.version)
    print("内容:", await response.text()[:200])

    性能优化配置

    # 自定义连接器配置
    connector = aiohttp.TCPConnector(
    limit=100, # 最大并发连接数
    limit_per_host=20, # 单主机最大连接数
    ssl=False, # 禁用SSL验证(仅用于测试)
    force_close=True # 避免连接延迟关闭
    )

    # 自定义会话配置
    session = aiohttp.ClientSession(
    connector=connector,
    timeout=aiohttp.ClientTimeout(total=30),
    headers={'User-Agent': 'MyApp/1.0'},
    cookie_jar=aiohttp.CookieJar(unsafe=True)
    )

    最佳实践

  • 重用ClientSession:避免为每个请求创建新会话
  • 使用连接池:合理配置TCPConnector参数
  • 超时设置:总是配置合理的超时时间
  • 资源清理:使用async with确保资源释放
  • 错误处理:捕获并处理常见网络异常try:
    async with session.get(url) as response:
    response.raise_for_status()
    return await response.json()
    except aiohttp.ClientError as e:
    print(f"请求错误: {e}")
  • 完整示例

    import aiohttp
    import asyncio
    from aiohttp import web

    # 客户端示例
    async def fetch_data():
    async with aiohttp.ClientSession() as session:
    # 并发请求多个API
    urls = [
    'https://jsonplaceholder.typicode.com/posts/1',
    'https://jsonplaceholder.typicode.com/comments/1',
    'https://jsonplaceholder.typicode.com/albums/1'
    ]

    tasks = []
    for url in urls:
    tasks.append(session.get(url))

    responses = await asyncio.gather(*tasks)

    results = []
    for response in responses:
    results.append(await response.json())

    return results

    # 服务器示例
    async def handle_index(request):
    return web.Response(text="Welcome to aiohttp server!")

    async def handle_api(request):
    data = await fetch_data()
    return web.json_response(data)

    # 创建应用
    app = web.Application()
    app.add_routes([
    web.get('/', handle_index),
    web.get('/api', handle_api)
    ])

    # 启动服务器
    async def start_server():
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    print("Server running at http://localhost:8080")

    # 保持运行
    while True:
    await asyncio.sleep(3600) # 每小时唤醒一次

    if __name__ == '__main__':
    asyncio.run(start_server())

    总结

    aiohttp 是 Python 异步生态中处理 HTTP 通信的首选工具,它提供了:

  • 高效客户端:用于高性能爬虫、API调用
  • 轻量级服务器:构建高性能Web服务和API
  • WebSocket支持:实现实时双向通信
  • 连接池管理:优化资源利用率
  • 通过合理利用 aiohttp 的异步特性,开发者可以轻松构建出能够处理数万并发连接的高性能网络应用,同时保持代码的简洁性和可维护性。

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » Python aiohttp 全面指南:异步HTTP客户端/服务器框架
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!