插件窝 干货文章 Pandas大数据帧写入CSV耗时过长如何高效解决?

Pandas大数据帧写入CSV耗时过长如何高效解决?

写入 使用 CSV 并行 951    来源:    2025-03-13

在处理大数据帧时,将Pandas DataFrame写入CSV文件可能会非常耗时,尤其是当数据量非常大时。以下是一些高效解决这个问题的策略:

1. 使用dask.dataframe

Dask是一个并行计算库,它可以处理比内存大的数据集。你可以使用dask.dataframe来替代Pandas DataFrame,它提供了与Pandas类似的API,但能够并行处理数据。

import dask.dataframe as dd

# 将Pandas DataFrame转换为Dask DataFrame
ddf = dd.from_pandas(df, npartitions=10)

# 写入CSV文件
ddf.to_csv('output-*.csv', index=False)

2. 使用multiprocessing并行写入

你可以使用Python的multiprocessing模块来并行地将数据写入多个CSV文件。

import pandas as pd
import numpy as np
from multiprocessing import Pool

def write_csv(args):
    df, path = args
    df.to_csv(path, index=False)

# 假设df是你的大数据帧
df = pd.DataFrame(np.random.randn(1000000, 10))

# 将数据帧分割成多个部分
num_partitions = 10
df_split = np.array_split(df, num_partitions)

# 创建路径列表
paths = [f'output_part_{i}.csv' for i in range(num_partitions)]

# 使用多进程并行写入
with Pool() as pool:
    pool.map(write_csv, zip(df_split, paths))

3. 使用fastparquetpyarrow写入Parquet文件

Parquet是一种列式存储格式,通常比CSV更高效。你可以使用fastparquetpyarrow库将DataFrame写入Parquet文件。

import pandas as pd
import numpy as np

# 假设df是你的大数据帧
df = pd.DataFrame(np.random.randn(1000000, 10))

# 使用fastparquet写入Parquet文件
df.to_parquet('output.parquet', engine='fastparquet')

# 或者使用pyarrow
df.to_parquet('output.parquet', engine='pyarrow')

4. 使用chunksize参数分块写入

如果你仍然希望使用CSV格式,可以使用to_csvchunksize参数来分块写入数据。

import pandas as pd
import numpy as np

# 假设df是你的大数据帧
df = pd.DataFrame(np.random.randn(1000000, 10))

# 分块写入CSV文件
chunksize = 100000
for i, chunk in enumerate(range(0, len(df), chunksize)):
    df.iloc[chunk:chunk + chunksize].to_csv(f'output_chunk_{i}.csv', index=False)

5. 使用modin.pandas替代Pandas

Modin是一个替代Pandas的库,它使用Ray或Dask作为后端,能够自动并行化操作。

import modin.pandas as pd
import numpy as np

# 假设df是你的大数据帧
df = pd.DataFrame(np.random.randn(1000000, 10))

# 写入CSV文件
df.to_csv('output.csv', index=False)

6. 使用csv.writer直接写入CSV

如果你需要更细粒度的控制,可以直接使用Python的csv.writer来写入CSV文件。

import pandas as pd
import numpy as np
import csv

# 假设df是你的大数据帧
df = pd.DataFrame(np.random.randn(1000000, 10))

# 打开CSV文件并写入
with open('output.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(df.columns)  # 写入列名
    for row in df.itertuples(index=False, name=None):
        writer.writerow(row)

7. 使用joblib并行化写入

joblib是一个用于并行计算的库,可以用来并行化写入操作。

import pandas as pd
import numpy as np
from joblib import Parallel, delayed

def write_csv(df, path):
    df.to_csv(path, index=False)

# 假设df是你的大数据帧
df = pd.DataFrame(np.random.randn(1000000, 10))

# 将数据帧分割成多个部分
num_partitions = 10
df_split = np.array_split(df, num_partitions)

# 创建路径列表
paths = [f'output_part_{i}.csv' for i in range(num_partitions)]

# 使用joblib并行写入
Parallel(n_jobs=num_partitions)(delayed(write_csv)(df, path) for df, path in zip(df_split, paths))

总结

根据你的具体需求和环境,可以选择上述方法中的一种或多种组合来优化Pandas大数据帧写入CSV的性能。通常,使用并行化或更高效的存储格式(如Parquet)可以显著提高写入速度。