Python 提供了三种主要的并发方式:多线程(threading)、多进程(multiprocessing)、协程(asyncio)。每种方式都有其适用场景,理解它们的区别是写出高效并发代码的前提。
GIL 的影响与限制 在讨论具体方案前,必须理解 Python 的 GIL(Global Interpreter Lock)。
GIL 是什么 GIL 是 CPython 的一个机制:同一时刻,只有一个线程执行 Python 字节码 。这意味着多线程在 CPU 密集型任务中无法真正并行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import threadingimport timedef cpu_task (n ): result = 0 for i in range (n): result += i ** 2 return result start = time.time() cpu_task(10_000_000 ) print (f"单线程: {time.time() - start:.2 f} s" )start = time.time() t1 = threading.Thread(target=cpu_task, args=(10_000_000 ,)) t2 = threading.Thread(target=cpu_task, args=(10_000_000 ,)) t1.start(); t2.start() t1.join(); t2.join() print (f"双线程: {time.time() - start:.2 f} s" )
结果:双线程没有加速 ,甚至可能更慢(线程切换开销)。
GIL 影响的场景
场景
GIL 影响
原因
CPU 密集型
严重
同一时刻只有一个线程执行
IO 密集型
轻微
IO 时线程主动释放 GIL
C 扩展调用
无
扩展可以释放 GIL
NumPy 操作
无
NumPy 内部释放 GIL
解决方案 1 2 3 4 5 6 7 8 from multiprocessing import Poolwith Pool(4 ) as p: results = p.map (cpu_task, [10_000_000 ] * 4 )
threading 模块实战 多线程适合 IO 密集型 任务。
基础用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import threadingimport timedef fetch_url (url ): """模拟 HTTP 请求""" print (f"开始请求: {url} " ) time.sleep(1 ) print (f"完成请求: {url} " ) return f"结果: {url} " start = time.time() for url in ['url1' , 'url2' , 'url3' ]: fetch_url(url) print (f"串行耗时: {time.time() - start:.2 f} s" ) start = time.time() threads = [] for url in ['url1' , 'url2' , 'url3' ]: t = threading.Thread(target=fetch_url, args=(url,)) threads.append(t) t.start() for t in threads: t.join() print (f"多线程耗时: {time.time() - start:.2 f} s" )
线程间通信:Queue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import threadingfrom queue import Queuedef producer (queue ): for i in range (5 ): queue.put(i) print (f"生产: {i} " ) def consumer (queue ): while True : item = queue.get() if item is None : break print (f"消费: {item} " ) queue.task_done() q = Queue() threads = [] threads.append(threading.Thread(target=producer, args=(q,))) for _ in range (2 ): t = threading.Thread(target=consumer, args=(q,)) threads.append(t) t.start() threads[0 ].join() for _ in range (2 ): q.put(None ) for t in threads[1 :]: t.join()
线程同步:Lock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import threadingcounter = 0 lock = threading.Lock() def increment (): global counter for _ in range (100_000 ): with lock: counter += 1 threads = [threading.Thread(target=increment) for _ in range (4 )] for t in threads: t.start() for t in threads: t.join() print (f"计数器: {counter} " )
multiprocessing 模块实战 多进程绕过 GIL,适合 CPU 密集型 任务。
基础用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from multiprocessing import Process, cpu_countimport timedef cpu_task (n ): result = 0 for i in range (n): result += i ** 2 return result if __name__ == '__main__' : print (f"CPU 核心数: {cpu_count()} " ) start = time.time() cpu_task(10_000_000 ) print (f"单进程: {time.time() - start:.2 f} s" ) start = time.time() processes = [] for _ in range (4 ): p = Process(target=cpu_task, args=(10_000_000 ,)) processes.append(p) p.start() for p in processes: p.join() print (f"多进程: {time.time() - start:.2 f} s" )
Pool 更高级的接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from multiprocessing import Poolimport timedef heavy_compute (n ): return sum (i ** 2 for i in range (n)) if __name__ == '__main__' : with Pool(4 ) as pool: results = pool.map (heavy_compute, [1_000_000 ] * 8 ) print (f"结果: {results} " ) with Pool(4 ) as pool: for result in pool.imap(heavy_compute, [1_000_000 ] * 8 ): print (f"完成: {result} " ) with Pool(4 ) as pool: async_result = pool.apply_async(heavy_compute, (1_000_000 ,)) print (f"异步结果: {async_result.get()} " ) def callback (result ): print (f"任务完成: {result} " ) with Pool(4 ) as pool: pool.apply_async(heavy_compute, (1_000_000 ,), callback=callback) time.sleep(1 )
进程间通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from multiprocessing import Process, Queue, Pipe, Managerdef worker_queue (q ): while True : item = q.get() if item is None : break q.put(item * 2 ) def worker_pipe (conn ): while True : msg = conn.recv() if msg == 'quit' : break conn.send(msg.upper()) parent_conn, child_conn = Pipe() p = Process(target=worker_pipe, args=(child_conn,)) p.start() parent_conn.send('hello' ) print (parent_conn.recv()) parent_conn.send('quit' ) p.join() def worker_manager (shared_dict ): shared_dict['count' ] = shared_dict.get('count' , 0 ) + 1 manager = Manager() shared = manager.dict () processes = [Process(target=worker_manager, args=(shared,)) for _ in range (10 )] for p in processes: p.start() for p in processes: p.join() print (shared)
concurrent.futures 高层接口 concurrent.futures 是 Python 3.2 引入的高层接口,封装了 threading 和 multiprocessing。
ThreadPoolExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from concurrent.futures import ThreadPoolExecutor, as_completedimport timedef fetch_data (url ): time.sleep(1 ) return f"结果: {url} " urls = [f"url_{i} " for i in range (10 )] with ThreadPoolExecutor(max_workers=5 ) as executor: results = executor.map (fetch_data, urls) for r in results: print (r) with ThreadPoolExecutor(max_workers=5 ) as executor: futures = {executor.submit(fetch_data, url): url for url in urls} for future in as_completed(futures): url = futures[future] try : result = future.result() print (f"{url} -> {result} " ) except Exception as e: print (f"{url} 失败: {e} " )
ProcessPoolExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from concurrent.futures import ProcessPoolExecutorimport mathdef is_prime (n ): if n < 2 : return False for i in range (2 , int (math.sqrt(n)) + 1 ): if n % i == 0 : return False return True numbers = range (1000000 ) with ProcessPoolExecutor(max_workers=4 ) as executor: results = executor.map (is_prime, numbers) primes = [n for n, is_p in zip (numbers, results) if is_p] print (f"找到 {len (primes)} 个质数" )
异步等待 1 2 3 4 5 6 7 8 9 10 11 from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETEDwith ThreadPoolExecutor(max_workers=5 ) as executor: futures = [executor.submit(task, i) for i in range (10 )] done, not_done = wait(futures) print (f"完成: {len (done)} , 未完成: {len (not_done)} " )
协程与线程的选择 性能对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport threadingimport timeasync def async_io (): await asyncio.sleep(0.1 ) def sync_io (): time.sleep(0.1 ) start = time.time() asyncio.run(asyncio.gather(*[async_io() for _ in 100 ])) print (f"asyncio: {time.time() - start:.2 f} s" ) start = time.time() threads = [threading.Thread(target=sync_io) for _ in range (100 )] [t.start() for t in threads] [t.join() for t in threads] print (f"threading: {time.time() - start:.2 f} s" )
选择指南
场景
推荐方案
原因
HTTP 请求并发
asyncio / threading
IO 密集
文件批量处理
asyncio / threading
IO 密集
CPU 密集计算
multiprocessing
绕过 GIL
图像处理
multiprocessing
绕过 GIL
实时响应
asyncio
事件驱动
简单并行
concurrent.futures
统一接口
实战:并发下载任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from concurrent.futures import ThreadPoolExecutor, as_completedimport requestsimport timedef download_file (url, session=None ): """下载单个文件""" session = session or requests.Session() try : response = session.get(url, timeout=10 ) response.raise_for_status() return url, len (response.content), None except Exception as e: return url, 0 , str (e) def concurrent_download (urls, max_workers=10 ): """并发下载多个文件""" results = {'success' : [], 'failed' : []} session = requests.Session() with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = { executor.submit(download_file, url, session): url for url in urls } for future in as_completed(future_to_url): url = future_to_url[future] url, size, error = future.result() if error: results['failed' ].append({'url' : url, 'error' : error}) else : results['success' ].append({'url' : url, 'size' : size}) return results urls = [f"https://example.com/file{i} .pdf" for i in range (100 )] results = concurrent_download(urls, max_workers=20 ) print (f"成功: {len (results['success' ])} , 失败: {len (results['failed' ])} " )
总结 Python 并发编程要点:
方案
适用场景
GIL
资源开销
threading
IO 密集型
受限
小
multiprocessing
CPU 密集型
绕过
大
asyncio
高并发 IO
无
最小
concurrent.futures
通用
取决于底层
取决于底层
选择原则:
IO 密集 :优先 asyncio,其次 threading
CPU 密集 :multiprocessing
混合型 :asyncio + multiprocessing(asyncio 处理 IO,子进程处理计算)