Python 并发编程:多线程、多进程、协程对比与选用

Python 提供了三种主要的并发方式:多线程(threading)、多进程(multiprocessing)、协程(asyncio)。每种方式都有其适用场景,理解它们的区别是写出高效并发代码的前提。

GIL 的影响与限制

在讨论具体方案前,必须理解 Python 的 GIL(Global Interpreter Lock)。

GIL 是什么

GIL 是 CPython 的一个机制:同一时刻,只有一个线程执行 Python 字节码。这意味着多线程在 CPU 密集型任务中无法真正并行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

# CPU 密集型任务
def cpu_task(n):
result = 0
for i in range(n):
result += i ** 2
return result

# 测量单线程
start = time.time()
cpu_task(10_000_000)
print(f"单线程: {time.time() - start:.2f}s")

# 测量双线程
start = time.time()
t1 = threading.Thread(target=cpu_task, args=(10_000_000,))
t2 = threading.Thread(target=cpu_task, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"双线程: {time.time() - start:.2f}s")

结果:双线程没有加速,甚至可能更慢(线程切换开销)。

GIL 影响的场景

场景 GIL 影响 原因
CPU 密集型 严重 同一时刻只有一个线程执行
IO 密集型 轻微 IO 时线程主动释放 GIL
C 扩展调用 扩展可以释放 GIL
NumPy 操作 NumPy 内部释放 GIL

解决方案

1
2
3
4
5
6
7
8
# CPU 密集型 → 多进程
from multiprocessing import Pool

with Pool(4) as p:
results = p.map(cpu_task, [10_000_000] * 4)

# IO 密集型 → asyncio 或 threading
# (threading 在 IO 时会释放 GIL)

threading 模块实战

多线程适合 IO 密集型 任务。

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time

def fetch_url(url):
"""模拟 HTTP 请求"""
print(f"开始请求: {url}")
time.sleep(1) # 模拟 IO 等待
print(f"完成请求: {url}")
return f"结果: {url}"

# 单线程串行
start = time.time()
for url in ['url1', 'url2', 'url3']:
fetch_url(url)
print(f"串行耗时: {time.time() - start:.2f}s") # ~3s

# 多线程并行
start = time.time()
threads = []
for url in ['url1', 'url2', 'url3']:
t = threading.Thread(target=fetch_url, args=(url,))
threads.append(t)
t.start()

# 等待所有线程完成
for t in threads:
t.join()
print(f"多线程耗时: {time.time() - start:.2f}s") # ~1s

线程间通信:Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading
from queue import Queue

def producer(queue):
for i in range(5):
queue.put(i)
print(f"生产: {i}")

def consumer(queue):
while True:
item = queue.get()
if item is None: # 哨兵值
break
print(f"消费: {item}")
queue.task_done()

q = Queue()
threads = []

# 启动生产者
threads.append(threading.Thread(target=producer, args=(q,)))

# 启动消费者
for _ in range(2):
t = threading.Thread(target=consumer, args=(q,))
threads.append(t)
t.start()

# 等待生产者完成
threads[0].join()

# 发送哨兵值,停止消费者
for _ in range(2):
q.put(None)

for t in threads[1:]:
t.join()

线程同步:Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

counter = 0
lock = threading.Lock()

def increment():
global counter
for _ in range(100_000):
with lock: # 确保原子性
counter += 1

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()

print(f"计数器: {counter}") # 正确: 400000

multiprocessing 模块实战

多进程绕过 GIL,适合 CPU 密集型 任务。

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Process, cpu_count
import time

def cpu_task(n):
result = 0
for i in range(n):
result += i ** 2
return result

if __name__ == '__main__':
print(f"CPU 核心数: {cpu_count()}")

# 单进程
start = time.time()
cpu_task(10_000_000)
print(f"单进程: {time.time() - start:.2f}s")

# 多进程
start = time.time()
processes = []
for _ in range(4):
p = Process(target=cpu_task, args=(10_000_000,))
processes.append(p)
p.start()

for p in processes:
p.join()

print(f"多进程: {time.time() - start:.2f}s") # 约为单进程的 1/4

Pool 更高级的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Pool
import time

def heavy_compute(n):
return sum(i ** 2 for i in range(n))

if __name__ == '__main__':
# map:批量提交任务
with Pool(4) as pool:
results = pool.map(heavy_compute, [1_000_000] * 8)
print(f"结果: {results}")

# imap:惰性迭代
with Pool(4) as pool:
for result in pool.imap(heavy_compute, [1_000_000] * 8):
print(f"完成: {result}")

# apply_async:异步提交
with Pool(4) as pool:
async_result = pool.apply_async(heavy_compute, (1_000_000,))
print(f"异步结果: {async_result.get()}")

# 回调函数
def callback(result):
print(f"任务完成: {result}")

with Pool(4) as pool:
pool.apply_async(heavy_compute, (1_000_000,), callback=callback)
time.sleep(1)

进程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from multiprocessing import Process, Queue, Pipe, Manager

# Queue:线程安全、进程安全
def worker_queue(q):
while True:
item = q.get()
if item is None:
break
q.put(item * 2)

# Pipe:适合两个进程间通信
def worker_pipe(conn):
while True:
msg = conn.recv()
if msg == 'quit':
break
conn.send(msg.upper())

parent_conn, child_conn = Pipe()
p = Process(target=worker_pipe, args=(child_conn,))
p.start()

parent_conn.send('hello')
print(parent_conn.recv()) # 'HELLO'
parent_conn.send('quit')
p.join()

# Manager:共享状态
def worker_manager(shared_dict):
shared_dict['count'] = shared_dict.get('count', 0) + 1

manager = Manager()
shared = manager.dict()

processes = [Process(target=worker_manager, args=(shared,)) for _ in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()

print(shared) # {'count': 10}

concurrent.futures 高层接口

concurrent.futures 是 Python 3.2 引入的高层接口,封装了 threading 和 multiprocessing。

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_data(url):
time.sleep(1)
return f"结果: {url}"

urls = [f"url_{i}" for i in range(10)]

# 方式1:map
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch_data, urls)
for r in results:
print(r)

# 方式2:submit + as_completed(按完成顺序获取)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_data, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f"{url} -> {result}")
except Exception as e:
print(f"{url} 失败: {e}")

ProcessPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True

numbers = range(1000000)

with ProcessPoolExecutor(max_workers=4) as executor:
# 判断是否为质数
results = executor.map(is_prime, numbers)

primes = [n for n, is_p in zip(numbers, results) if is_p]
print(f"找到 {len(primes)} 个质数")

异步等待

1
2
3
4
5
6
7
8
9
10
11
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]

# 等待所有完成
done, not_done = wait(futures)
print(f"完成: {len(done)}, 未完成: {len(not_done)}")

# 等待第一个完成
# done, _ = wait(futures, return_when=FIRST_COMPLETED)

协程与线程的选择

性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# IO 密集型:asyncio vs threading vs multiprocessing
import asyncio
import threading
import time

async def async_io():
await asyncio.sleep(0.1)

def sync_io():
time.sleep(0.1)

# asyncio
start = time.time()
asyncio.run(asyncio.gather(*[async_io() for _ in 100]))
print(f"asyncio: {time.time() - start:.2f}s") # ~0.1s

# threading
start = time.time()
threads = [threading.Thread(target=sync_io) for _ in range(100)]
[t.start() for t in threads]
[t.join() for t in threads]
print(f"threading: {time.time() - start:.2f}s") # ~0.1s(IO 时释放 GIL)

选择指南

场景 推荐方案 原因
HTTP 请求并发 asyncio / threading IO 密集
文件批量处理 asyncio / threading IO 密集
CPU 密集计算 multiprocessing 绕过 GIL
图像处理 multiprocessing 绕过 GIL
实时响应 asyncio 事件驱动
简单并行 concurrent.futures 统一接口

实战:并发下载任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def download_file(url, session=None):
"""下载单个文件"""
session = session or requests.Session()
try:
response = session.get(url, timeout=10)
response.raise_for_status()
return url, len(response.content), None
except Exception as e:
return url, 0, str(e)

def concurrent_download(urls, max_workers=10):
"""并发下载多个文件"""
results = {'success': [], 'failed': []}
session = requests.Session()

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(download_file, url, session): url
for url in urls
}

for future in as_completed(future_to_url):
url = future_to_url[future]
url, size, error = future.result()

if error:
results['failed'].append({'url': url, 'error': error})
else:
results['success'].append({'url': url, 'size': size})

return results

# 使用
urls = [f"https://example.com/file{i}.pdf" for i in range(100)]
results = concurrent_download(urls, max_workers=20)

print(f"成功: {len(results['success'])}, 失败: {len(results['failed'])}")

总结

Python 并发编程要点:

方案 适用场景 GIL 资源开销
threading IO 密集型 受限
multiprocessing CPU 密集型 绕过
asyncio 高并发 IO 最小
concurrent.futures 通用 取决于底层 取决于底层

选择原则:

  • IO 密集:优先 asyncio,其次 threading
  • CPU 密集:multiprocessing
  • 混合型:asyncio + multiprocessing(asyncio 处理 IO,子进程处理计算)