在业务开发中,我们经常会遇到 IO 密集型场景:HTTP 请求、文件读写、数据库查询等。传统的同步编程模式在这些场景下效率低下,而多线程又有 GIL 的限制。Python 3.4 引入的 asyncio 为我们提供了一种全新的并发方案。
同步与异步的本质区别
理解异步编程,首先需要区分同步与异步的核心差异。
同步执行时,一个任务必须等待前一个任务完全结束才能开始。当程序发起一个 IO 请求(如 HTTP 调用),线程会被阻塞,直到数据返回后才继续执行后续代码。这种方式简单直观,但在高并发场景下资源利用率很低。
异步执行则不同,当遇到 IO 操作时,程序不会原地等待,而是注册一个回调,继续处理其他任务。当 IO 完成时,系统会通知程序处理结果。这就像餐厅的点餐系统:服务员把订单送到厨房后可以去服务其他桌,而不是站在厨房门口等菜做好。
1 2 3 4 5 6 7 8 9 10 11 12 13
| def sync_requests(): result1 = requests.get(url1) result2 = requests.get(url2) result3 = requests.get(url3) return result1, result2, result3
async def async_requests(): result1 = await http_get(url1) result2 = await http_get(url2) result3 = await http_get(url3) return result1, result2, result3
|
事件循环的工作原理
asyncio 的核心是事件循环(Event Loop)。事件循环本质上是一个无限循环,负责:
- 监听 IO 事件(socket 可读、可写、异常等)
- 分发回调给等待的协程
- 调度新加入的协程
当我们调用 asyncio.run() 时,背后发生了什么?
1 2 3 4 5 6 7 8 9 10 11
| def asyncio_run(coro): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(coro) finally: loop.close()
|
事件循环内部维护着一个任务队列和IO选择器。当协程执行到 await 时,控制权交还给事件循环,事件循环会继续处理其他就绪的任务或监听新的 IO 事件。
1 2 3 4 5 6 7 8 9 10 11 12 13
| ┌─────────────────────────────────────────┐ │ 事件循环 │ │ ┌─────────────────────────────────┐ │ │ │ 任务队列 │ │ │ │ [Task1] [Task2] [Task3] ... │ │ │ └─────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────┐ │ │ │ IO 选择器(Selector) │ │ │ │ 监听 socket、pipe 等 IO 事件 │ │ │ └─────────────────────────────────┘ │ └─────────────────────────────────────────┘
|
协程的创建与调度
协程(Coroutine)是 asyncio 的基础单元。它是一种可以暂停和恢复执行的函数。
定义协程函数
使用 async def 定义的函数就是协程函数:
1 2 3 4
| async def fetch_data(): await asyncio.sleep(1) return {"data": "result"}
|
注意,调用协程函数不会立即执行,只会创建一个协程对象:
1 2
| coro = fetch_data() print(type(coro))
|
调度协程执行
要让协程真正运行,需要将其交给事件循环:
1 2 3 4 5 6 7 8 9 10 11 12 13
| async def main(): result = await fetch_data() print(result)
asyncio.run(main())
loop = asyncio.new_event_loop() try: result = loop.run_until_complete(fetch_data()) finally: loop.close()
|
创建多个任务
使用 asyncio.create_task() 可以并发调度多个协程:
1 2 3 4 5 6 7 8 9 10 11
| async def main(): task1 = asyncio.create_task(fetch_data()) task2 = asyncio.create_task(fetch_data()) task3 = asyncio.create_task(fetch_data())
results = await asyncio.gather(task1, task2, task3) print(results)
asyncio.run(main())
|
await 关键字详解
await 是异步编程的核心语法,它的作用是:
- 等待一个可等待对象(Awaitable)完成
- 暂停当前协程,将控制权交还给事件循环
- 恢复协程执行,获取结果
哪些对象可以被 await
- 协程对象(Coroutine)
- Task 对象
- Future 对象
- 支持
__await__ 方法的对象
1 2 3 4 5 6 7 8 9 10 11
| async def example(): result = await asyncio.sleep(1)
task = asyncio.create_task(some_coro()) result = await task
future = asyncio.Future() result = await future
|
await 的执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| async def coro1(): print("coro1 开始") await asyncio.sleep(1) print("coro1 结束") return "coro1 结果"
async def main(): result = await coro1() print(f"收到结果: {result}")
|
Future 与 Task 的关系
Future
Future 是一个占位符,代表一个将来才会完成的操作。它有几个状态:
Pending:等待执行
Running:正在执行
Done:执行完成
Cancelled:被取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| async def with_future(): future = asyncio.Future()
async def set_result(): await asyncio.sleep(1) future.set_result("完成")
asyncio.create_task(set_result())
result = await future print(result)
|
Task
Task 是 Future 的子类,专门用于调度协程执行。asyncio.create_task() 实际上就是创建了一个 Task:
1 2 3
| task1 = asyncio.create_task(coro()) task2 = asyncio.ensure_future(coro())
|
Task 相比 Future 的优势是:它自动绑定协程,自动管理执行状态。
1 2 3 4 5 6 7 8 9 10
| async def demo_task(): await asyncio.sleep(1) return "done"
async def main(): task = asyncio.create_task(demo_task()) print("做其他事情...") result = await task print(result)
|
实际场景:异步 HTTP 请求
asyncio 最常见的应用场景是并发 HTTP 请求。使用 aiohttp 库可以轻松实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import aiohttp import asyncio
async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return results
async def fetch(session, url): async with session.get(url) as response: return await response.text()
|
性能对比
假设有 10 个 HTTP 请求,每个耗时 200ms:
| 方式 |
理论耗时 |
| 同步串行 |
2000ms |
| 多线程(10线程) |
~200ms |
| asyncio 并发 |
~200ms |
asyncio 在 IO 密集型场景下,可以达到与多线程相同的效率,但省去了线程创建和切换的开销。
常见陷阱与最佳实践
陷阱1:在非协程函数中调用协程
1 2 3 4 5 6 7
| def sync_func(): result = await some_coro()
async def async_func(): result = await some_coro()
|
陷阱2:忘记 await
1 2 3 4 5 6 7
| async def main(): some_coro()
async def main(): await some_coro()
|
陷阱3:阻塞事件循环
1 2 3 4 5 6 7 8
| async def bad_example(): result = blocking_io_operation()
async def good_example(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, blocking_io_operation)
|
最佳实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| async def main(): await asyncio.gather(task1, task2, task3)
asyncio.run(main())
async def with_timeout(): try: result = await asyncio.wait_for(coro(), timeout=5.0) except asyncio.TimeoutError: print("操作超时")
async def limited_concurrent(): semaphore = asyncio.Semaphore(5)
async def limited_task(url): async with semaphore: return await fetch(url)
await asyncio.gather(*[limited_task(url) for url in urls])
async def with_exception_handling(): try: await risky_operation() except Exception as e: print(f"发生错误: {e}")
|
总结
asyncio 为 Python 提供了强大的异步编程能力。核心要点:
- 事件循环是 asyncio 的调度中心
- 协程是可暂停/恢复的函数,用
async def 定义
- await 等待可等待对象,暂停当前协程
- Task 是 Future 的子类,用于调度协程
- 避免在协程中调用阻塞代码,使用
run_in_executor
掌握 asyncio 的原理和使用方式,能够在 IO 密集型场景下显著提升程序性能。