Python 异步编程深入理解:asyncio 实战与原理剖析

在业务开发中,我们经常会遇到 IO 密集型场景:HTTP 请求、文件读写、数据库查询等。传统的同步编程模式在这些场景下效率低下,而多线程又有 GIL 的限制。Python 3.4 引入的 asyncio 为我们提供了一种全新的并发方案。

同步与异步的本质区别

理解异步编程,首先需要区分同步异步的核心差异。

同步执行时,一个任务必须等待前一个任务完全结束才能开始。当程序发起一个 IO 请求(如 HTTP 调用),线程会被阻塞,直到数据返回后才继续执行后续代码。这种方式简单直观,但在高并发场景下资源利用率很低。

异步执行则不同,当遇到 IO 操作时,程序不会原地等待,而是注册一个回调,继续处理其他任务。当 IO 完成时,系统会通知程序处理结果。这就像餐厅的点餐系统:服务员把订单送到厨房后可以去服务其他桌,而不是站在厨房门口等菜做好。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 同步方式:串行执行,总耗时 = 请求1 + 请求2 + 请求3
def sync_requests():
result1 = requests.get(url1) # 阻塞等待
result2 = requests.get(url2) # 阻塞等待
result3 = requests.get(url3) # 阻塞等待
return result1, result2, result3

# 异步方式:并发执行,总耗时 ≈ max(请求1, 请求2, 请求3)
async def async_requests():
result1 = await http_get(url1)
result2 = await http_get(url2)
result3 = await http_get(url3)
return result1, result2, result3

事件循环的工作原理

asyncio 的核心是事件循环(Event Loop)。事件循环本质上是一个无限循环,负责:

  1. 监听 IO 事件(socket 可读、可写、异常等)
  2. 分发回调给等待的协程
  3. 调度新加入的协程

当我们调用 asyncio.run() 时,背后发生了什么?

1
2
3
4
5
6
7
8
9
10
11
# asyncio.run() 的简化实现
def asyncio_run(coro):
# 1. 创建事件循环
loop = asyncio.new_event_loop()
# 2. 设置为当前线程的事件循环
asyncio.set_event_loop(loop)
try:
# 3. 运行协程直到完成
loop.run_until_complete(coro)
finally:
loop.close()

事件循环内部维护着一个任务队列IO选择器。当协程执行到 await 时,控制权交还给事件循环,事件循环会继续处理其他就绪的任务或监听新的 IO 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────┐
│ 事件循环 │
│ ┌─────────────────────────────────┐ │
│ │ 任务队列 │ │
│ │ [Task1] [Task2] [Task3] ... │ │
│ └─────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ IO 选择器(Selector) │ │
│ │ 监听 socket、pipe 等 IO 事件 │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘

协程的创建与调度

协程(Coroutine)是 asyncio 的基础单元。它是一种可以暂停和恢复执行的函数。

定义协程函数

使用 async def 定义的函数就是协程函数:

1
2
3
4
async def fetch_data():
# 这是协程函数,调用它会返回一个协程对象
await asyncio.sleep(1) # 模拟 IO 操作
return {"data": "result"}

注意,调用协程函数不会立即执行,只会创建一个协程对象:

1
2
coro = fetch_data()  # 这只是创建协程对象,函数体还没执行
print(type(coro)) # <class 'coroutine'>

调度协程执行

要让协程真正运行,需要将其交给事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 方式1:asyncio.run()(推荐)
async def main():
result = await fetch_data()
print(result)

asyncio.run(main())

# 方式2:手动管理事件循环
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(fetch_data())
finally:
loop.close()

创建多个任务

使用 asyncio.create_task() 可以并发调度多个协程:

1
2
3
4
5
6
7
8
9
10
11
async def main():
# 创建三个任务并立即调度执行
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(fetch_data())
task3 = asyncio.create_task(fetch_data())

# 并发等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(results)

asyncio.run(main())

await 关键字详解

await 是异步编程的核心语法,它的作用是:

  1. 等待一个可等待对象(Awaitable)完成
  2. 暂停当前协程,将控制权交还给事件循环
  3. 恢复协程执行,获取结果

哪些对象可以被 await

  • 协程对象(Coroutine)
  • Task 对象
  • Future 对象
  • 支持 __await__ 方法的对象
1
2
3
4
5
6
7
8
9
10
11
async def example():
# 1. await 协程
result = await asyncio.sleep(1)

# 2. await Task
task = asyncio.create_task(some_coro())
result = await task

# 3. await Future(通常不需要手动 await Future)
future = asyncio.Future()
result = await future

await 的执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def coro1():
print("coro1 开始")
await asyncio.sleep(1) # 暂停,等待事件循环调度
print("coro1 结束")
return "coro1 结果"

async def main():
result = await coro1() # main 暂停,等待 coro1 完成
print(f"收到结果: {result}")

# 执行流程:
# 1. asyncio.run(main()) 启动事件循环
# 2. main() 开始执行,遇到 await coro1()
# 3. main() 暂停,coro1 被调度执行
# 4. coro1 遇到 sleep,暂停
# 5. 1秒后,sleep 完成,coro1 恢复执行
# 6. coro1 返回结果,main 恢复,收到结果

Future 与 Task 的关系

Future

Future 是一个占位符,代表一个将来才会完成的操作。它有几个状态:

  • Pending:等待执行
  • Running:正在执行
  • Done:执行完成
  • Cancelled:被取消
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def with_future():
# 创建一个 Future
future = asyncio.Future()

# 在后台设置结果(模拟异步操作)
async def set_result():
await asyncio.sleep(1)
future.set_result("完成")

# 并行执行
asyncio.create_task(set_result())

# 等待 future 完成
result = await future
print(result) # "完成"

Task

TaskFuture 的子类,专门用于调度协程执行。asyncio.create_task() 实际上就是创建了一个 Task:

1
2
3
# 两种方式创建 Task,效果相同
task1 = asyncio.create_task(coro()) # 推荐
task2 = asyncio.ensure_future(coro()) # 兼容旧代码

Task 相比 Future 的优势是:它自动绑定协程,自动管理执行状态。

1
2
3
4
5
6
7
8
9
10
async def demo_task():
await asyncio.sleep(1)
return "done"

# Task 可以在创建后立即继续执行,不阻塞
async def main():
task = asyncio.create_task(demo_task())
print("做其他事情...")
result = await task # 这里才真正等待
print(result)

实际场景:异步 HTTP 请求

asyncio 最常见的应用场景是并发 HTTP 请求。使用 aiohttp 库可以轻松实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import aiohttp
import asyncio

async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
# 创建所有任务
tasks = [fetch(session, url) for url in urls]
# 并发执行
results = await asyncio.gather(*tasks)
return results

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

# 串行执行:总耗时 = sum(每个请求时间)
# 并发执行:总耗时 ≈ max(每个请求时间)

性能对比

假设有 10 个 HTTP 请求,每个耗时 200ms:

方式 理论耗时
同步串行 2000ms
多线程(10线程) ~200ms
asyncio 并发 ~200ms

asyncio 在 IO 密集型场景下,可以达到与多线程相同的效率,但省去了线程创建和切换的开销。

常见陷阱与最佳实践

陷阱1:在非协程函数中调用协程

1
2
3
4
5
6
7
# 错误写法
def sync_func():
result = await some_coro() # SyntaxError!

# 正确写法
async def async_func():
result = await some_coro()

陷阱2:忘记 await

1
2
3
4
5
6
7
# 错误写法:协程没被执行
async def main():
some_coro() # 返回协程对象,但没有执行

# 正确写法
async def main():
await some_coro()

陷阱3:阻塞事件循环

1
2
3
4
5
6
7
8
# 错误:在协程中调用阻塞代码
async def bad_example():
result = blocking_io_operation() # 会阻塞整个事件循环!

# 正确:使用 run_in_executor 将阻塞操作放到线程池
async def good_example():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_io_operation)

最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 1. 使用 asyncio.run() 作为入口
async def main():
await asyncio.gather(task1, task2, task3)

asyncio.run(main()) # 自动处理事件循环生命周期

# 2. 合理设置超时
async def with_timeout():
try:
result = await asyncio.wait_for(coro(), timeout=5.0)
except asyncio.TimeoutError:
print("操作超时")

# 3. 使用 asyncio.Semaphore 控制并发数
async def limited_concurrent():
semaphore = asyncio.Semaphore(5) # 最多5个并发

async def limited_task(url):
async with semaphore:
return await fetch(url)

await asyncio.gather(*[limited_task(url) for url in urls])

# 4. 异常处理
async def with_exception_handling():
try:
await risky_operation()
except Exception as e:
print(f"发生错误: {e}")

总结

asyncio 为 Python 提供了强大的异步编程能力。核心要点:

  1. 事件循环是 asyncio 的调度中心
  2. 协程是可暂停/恢复的函数,用 async def 定义
  3. await 等待可等待对象,暂停当前协程
  4. Task 是 Future 的子类,用于调度协程
  5. 避免在协程中调用阻塞代码,使用 run_in_executor

掌握 asyncio 的原理和使用方式,能够在 IO 密集型场景下显著提升程序性能。