1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| import pandas as pd import gc
def process_large_csv(input_file, output_file): """处理 GB 级 CSV 文件"""
print("分析文件...") df_sample = pd.read_csv(input_file, nrows=1000) print(f"列: {df_sample.columns.tolist()}") print(f"样本内存: {df_sample.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
dtypes = { 'id': 'int32', 'user_id': 'int32', 'product_id': 'int32', 'quantity': 'int16', 'price': 'float32', 'status': 'category', 'created_at': 'string', }
chunk_size = 500_000 processed_chunks = []
print("开始处理...") for i, chunk in enumerate(pd.read_csv( input_file, chunksize=chunk_size, dtype=dtypes, usecols=list(dtypes.keys()) )): chunk = chunk[chunk['status'] != 'deleted']
chunk['created_at'] = pd.to_datetime(chunk['created_at'])
agg = chunk.groupby('user_id').agg({ 'quantity': 'sum', 'price': 'mean' }).reset_index()
processed_chunks.append(agg)
if (i + 1) % 10 == 0: print(f"已处理 {(i+1) * chunk_size:,} 行")
del chunk gc.collect()
print("合并结果...") result = pd.concat(processed_chunks, ignore_index=True)
final = result.groupby('user_id').agg({ 'quantity': 'sum', 'price': 'mean' }).reset_index()
print("保存结果...") final.to_csv(output_file, index=False)
print(f"完成!最终结果: {len(final):,} 行") print(f"内存占用: {final.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
process_large_csv('input.csv', 'output.csv')
|