Python 数据处理踩坑:pandas 读取大文件内存爆炸与优化

用 pandas 处理大数据时,最常见的问题是内存爆炸。几 GB 的 CSV 读取后内存飙升到十几 GB,甚至 OOM。本文总结内存爆炸的原因和优化方案。

内存爆炸原因分析

pandas 内存占用模型

1
2
3
4
5
6
7
8
9
10
11
12
import pandas as pd

# 100万行 x 100列的数据
df = pd.DataFrame({
'id': range(1_000_000),
'name': ['user_' + str(i) for i in range(1_000_000)],
'value': [i * 0.1 for i in range(1_000_000)],
'flag': [True if i % 2 == 0 else False for i in range(1_000_000)]
})

# 查看内存占用
df.info(memory_usage='deep')

输出:

1
2
3
4
5
6
7
Data columns (total 4):
id int64 8.0 MB
name object 104.6 MB # 字符串占用大
value float64 8.0 MB
flag bool 1.0 MB
----------------
Total: 121.6 MB

内存占用分析

数据类型 每列大小(1M 行)
int64 8 MB
float64 8 MB
bool 1 MB
object (字符串) 100+ MB
datetime 8 MB

字符串(object)是内存杀手:每个字符串都有额外开销,实际占用远超字符串本身。

常见误区

1
2
3
4
5
6
7
8
9
# 误区1:读取整个文件
df = pd.read_csv('big_file.csv') # 一次性读取全部

# 误区2:不做任何类型优化
df = pd.read_csv('big_file.csv') # 所有列都推断为 object 或 float64

# 误区3:多次复制 DataFrame
df1 = df.copy() # 内存翻倍
df2 = df1.copy() # 再翻倍

分块读取策略

基本分块读取

1
2
3
4
5
6
7
8
9
10
11
# 每次读取 10万行
chunk_size = 100_000
results = []

for chunk in pd.read_csv('big_file.csv', chunksize=chunk_size):
# 处理每一块
processed = process_chunk(chunk)
results.append(processed)

# 合并结果
final_df = pd.concat(results, ignore_index=True)

处理大文件示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def process_chunk(chunk):
"""处理单个数据块"""
# 过滤
chunk = chunk[chunk['status'] == 'active']
# 转换
chunk['date'] = pd.to_datetime(chunk['date'])
# 聚合
return chunk.groupby('category')['value'].sum()

# 分块处理
chunk_size = 500_000
aggregated_results = []

for chunk in pd.read_csv('big_file.csv', chunksize=chunk_size):
result = process_chunk(chunk)
aggregated_results.append(result)

# 最终合并
final_result = pd.concat(aggregated_results).groupby(level=0).sum()

读取时筛选行

1
2
3
4
5
6
7
8
9
10
11
# 只读取需要的行
# 假设文件很大,我们只需要 2024 年的数据

# 方案1:skiprows
# 先计算要跳过的行数
# skiprows = lambda x: x > 0 and pd.read_csv(...).iloc[:, 0].dtype == ...

# 方案2:读取后过滤
# 更简单,但浪费读取带宽

# 方案3:使用数据库或 Parquet

dtype 优化技巧

指定数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
# 优化前:所有数字都推断为 int64/float64
df = pd.read_csv('data.csv')

# 优化后:指定合适的数据类型
dtypes = {
'id': 'int32', # int64 -> int32(节省 50%)
'age': 'int8', # 如果年龄在 0-255 之间
'amount': 'float32', # float64 -> float32(节省 50%)
'is_active': 'bool', # object -> bool
'category': 'category', # 重复字符串使用 category
}

df = pd.read_csv('data.csv', dtype=dtypes)

category 类型

category 是 pandas 对重复字符串的优化类型,内部用整数编码:

1
2
3
4
5
6
7
8
9
10
11
# 普通 object
s = pd.Series(['apple'] * 100000 + ['banana'] * 100000)
s.memory_usage(deep=True) # ~1.6 MB

# category 类型
s_cat = s.astype('category')
s_cat.memory_usage(deep=True) # ~0.2 MB

# 对比:
# object:每个字符串占用 ~50 字节
# category:整数编码 + 字符串字典,节省 10-50 倍

memory_usage 指定

1
2
3
4
5
6
# 查看优化后的内存
df.info(memory_usage='deep')

# 对比优化前后
print(f"原始: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"优化: {df_opt.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

常用 dtype 选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 整数选择
int8 # -128 to 127
int16 # -32,768 to 32,767
int32 # -2B to 2B
int64 # > 2B

# 根据数据范围选择最小的
df['count'].astype('int32')
df['flag'].astype('bool')

# 浮点数选择
float32 # 6-7 位精度
float64 # 15-16 位精度(默认)

# 时间选择
datetime64[ns] # 纳秒精度
datetime64[s] # 秒精度(节省空间)

选择性加载列

只读取需要的列

1
2
3
4
# 只读取部分列
usecols = ['id', 'name', 'value', 'created_at']

df = pd.read_csv('big_file.csv', usecols=usecols)

结合 dtype 和 usecols

1
2
3
4
5
6
7
8
9
10
11
dtypes = {
'id': 'int32',
'value': 'float32',
'category': 'category',
}

df = pd.read_csv(
'big_file.csv',
usecols=['id', 'value', 'category', 'name'], # name 会用默认类型
dtype=dtypes
)

延迟加载

1
2
3
4
5
6
7
8
# 先读取列名
columns = pd.read_csv('big_file.csv', nrows=0).columns.tolist()

# 选择需要的列
needed_cols = ['id', 'value', 'category']

# 重新读取
df = pd.read_csv('big_file.csv', usecols=needed_cols, dtype=dtypes)

数据类型转换注意

避免类型推断错误

1
2
3
4
5
6
# 有缺失值的列可能被推断为 float 而不是 int
df = pd.read_csv('data.csv')
# 如果 'count' 列有 NaN,会被推断为 float64

# 解决方案:读完后再转换
df['count'] = df['count'].astype('Int32') # 注意:使用大写 I 的 nullable int

nullable 类型

1
2
3
4
5
6
# pandas 1.0+ 支持 nullable 类型
df['count'] = df['count'].astype('Int32') # 可以存 NaN 的 int32
df['flag'] = df['flag'].astype('boolean') # 可以存 NaN 的 bool

# 不要用
df['count'] = df['count'].astype('int32') # NaN 会丢失

日期处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 指定日期列
df = pd.read_csv('data.csv', parse_dates=['date'])

# 指定日期格式(加速)
df = pd.read_csv('data.csv', parse_dates=['date'], date_format='%Y-%m-%d')

# 如果日期格式混乱,手动处理
def parse_date(x):
try:
return pd.to_datetime(x, format='%Y-%m-%d')
except:
return pd.NaT

df['date'] = df['date'].apply(parse_date)

实战:GB 级 CSV 处理

处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import pandas as pd
import gc

def process_large_csv(input_file, output_file):
"""处理 GB 级 CSV 文件"""

# 1. 先查看文件信息
print("分析文件...")
df_sample = pd.read_csv(input_file, nrows=1000)
print(f"列: {df_sample.columns.tolist()}")
print(f"样本内存: {df_sample.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 2. 定义数据类型
dtypes = {
'id': 'int32',
'user_id': 'int32',
'product_id': 'int32',
'quantity': 'int16',
'price': 'float32',
'status': 'category',
'created_at': 'string', # 先读成 string
}

# 3. 分块处理
chunk_size = 500_000
processed_chunks = []

print("开始处理...")
for i, chunk in enumerate(pd.read_csv(
input_file,
chunksize=chunk_size,
dtype=dtypes,
usecols=list(dtypes.keys())
)):
# 过滤
chunk = chunk[chunk['status'] != 'deleted']

# 转换日期
chunk['created_at'] = pd.to_datetime(chunk['created_at'])

# 聚合(减少数据量)
agg = chunk.groupby('user_id').agg({
'quantity': 'sum',
'price': 'mean'
}).reset_index()

processed_chunks.append(agg)

# 每 10 个 chunk 打印一次进度
if (i + 1) % 10 == 0:
print(f"已处理 {(i+1) * chunk_size:,} 行")

# 释放 chunk 内存
del chunk
gc.collect()

# 4. 合并结果
print("合并结果...")
result = pd.concat(processed_chunks, ignore_index=True)

# 最终聚合
final = result.groupby('user_id').agg({
'quantity': 'sum',
'price': 'mean'
}).reset_index()

# 5. 保存
print("保存结果...")
final.to_csv(output_file, index=False)

print(f"完成!最终结果: {len(final):,} 行")
print(f"内存占用: {final.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 运行
process_large_csv('input.csv', 'output.csv')

使用 Parquet 格式

Parquet 是列式存储,更适合大数据处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 安装 pyarrow(Parquet 支持)
pip install pyarrow

# 读取 Parquet(自动类型推断)
df = pd.read_parquet('data.parquet')

# 保存为 Parquet
df.to_parquet('output.parquet', engine='pyarrow', compression='snappy')

# Parquet 优势:
# - 列式存储,只读取需要的列
# - 支持压缩,文件更小
# - 类型信息保留,无需重新推断
# - 读取性能比 CSV 快 10-100 倍

使用 DuckDB 分析

对于超大数据集,直接用 SQL 更高效:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import duckdb

# 创建数据库连接
con = duckdb.connect('data.db')

# 直接用 SQL 分析 CSV
result = con.execute("""
SELECT
category,
SUM(value) as total,
AVG(value) as average
FROM read_csv_auto('big_file.csv')
WHERE status = 'active'
GROUP BY category
""").fetchdf()

# 或者直接查询 Parquet
result = con.execute("""
SELECT * FROM 'data.parquet'
WHERE date >= '2024-01-01'
""").fetchdf()

总结

pandas 大数据处理要点:

问题 解决方案
内存爆炸 分块读取、dtype 优化
字符串占用大 使用 category 类型
读取慢 使用 Parquet、使用数据库
类型推断错误 手动指定 dtype
NaN 和 int 使用 nullable 类型(Int32)

内存优化优先级:

  1. 选择性加载列:只读需要的列
  2. dtype 优化:最有效的优化
  3. 分块处理:控制内存峰值
  4. 使用 Parquet:列式存储 + 压缩
  5. 使用数据库:超大数据用 DuckDB/SQLite