DataFrame.to_csv函数写入文件速度缓慢的解决方案

Pandas里的DataFrame.to_csv()是一个非常常用的函数,用于将内存中的数据以csv格式写到磁盘上,但当要写入的内容较多时,往往会遇到耗时过长的问题。这个问题的原因是to_csv()内部优化不够,我们可以利用其他软件包来曲线的解决此问题。

验证环境

硬件:

  • Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz 2.81 GHz
  • 32.0 GB / 512GB SSD

软件:

  • Windows 10 22H2
  • Python 3.9.13
  • pandas 2.2.2
  • numpy 1.26.4
  • duckdb 1.0.0
  • pyarrow 16.1.0

验证过程

首先随机生成一个较大尺寸的dataframe,内部所有数值都是float64类型,直接调用to_csv()写到磁盘并记录耗时;然后将此dataframe分别转换为numpyduckdbpyarrow的对应数据结构(转换过程本身耗时很短可忽略),并统计每次写入csv的耗时(毫秒),整理为下表:

DataFrame大小 CSV文件大小 pandas耗时 numpy耗时 duckdb耗时 pyarrow耗时
(10000,100) 17MB 1503 948 529 218
(50000,100) 87MB 7598 4815 2683 1107
(10000,500) 87MB 7341 4381 2811 1085

与Pandas.to_csv()对比的性能倍数关系:

DataFrame大小 CSV文件大小 pandas倍数 numpy倍数 duckdb倍数 pyarrow倍数
(10000,100) 17MB 1 1.6 2.8 6.9
(50000,100) 87MB 1 1.6 2.8 6.9
(10000,500) 87MB 1 1.7 2.6 6.8

注:验证过dask多partition的写入方式,但所得到的不是单个文件因此没有放入表格,并且写入耗时与pandas原生相差不大。

验证结论

可以看到在不同尺寸的数据量下,各软件包的性能都有所提升并且提升幅度比较稳定,其中pyarrow性能最佳,是pandas原生的接近7倍。建议对性能有要求的情况下,将dataframe转换为pyarrow table后再写入。

验证代码:

import pandas as pd
import numpy as np
import time

# 造dataframe数据
np.random.seed(42)
data = np.random.uniform(0, 100, size=(10000, 100))
df = pd.DataFrame(data)

# 直接用to_csv写入文件
# 验证过chunksize, index=False等参数对写入时间影响不大
t0 = time.time() * 1000
df.to_csv('c:/temp/1.csv')
t1 = time.time() * 1000
print(f'pandas cost {t1-t0} ms')

# df转换到numpy后写入csv文件
import numpy as np
numpy_array = df.values
t0 = time.time() * 1000
np.savetxt('c:/temp/1.csv', numpy_array, delimiter=',', fmt='%s')
t1 = time.time() * 1000
print(f'numpy cost {t1-t0} ms')

# df转换到duckdb表后写入csv文件
import duckdb
con = duckdb.connect(database=':memory:', read_only=False)
con.register('my_table', df)
t0 = time.time() * 1000
con.execute("COPY (SELECT * FROM my_table) TO 'c:/temp/2.csv' WITH (FORMAT CSV)")
t1 = time.time() * 1000
print(f'duckdb cost {t1-t0} ms')

# df转换到arrow表后写入csv文件
import pyarrow as pa
import pyarrow.csv as csv
table = pa.Table.from_pandas(df)
t0 = time.time() * 1000
csv.write_csv(table, 'c:/temp/1.csv')
t1 = time.time() * 1000
print(f'pyarrow cost {t1-t0} ms')

参考链接

Pandas向量化计算与for循环对比

对pandas DataFrame数据进行相同类型的计算时,对比使用向量化方式与for循环方式的时间开销差异。

TLDR

浮点数计算速度:apply比for循环快4.8倍,向量化比for循环快6900倍。

字符串计算速度:apply比for循环快5.0倍,向量化比for循环快580倍。

因此建议尽量避免使用for循环的方式处理dataframe数据。

一、测试环境:

Apple M2,16GB,SSD

python 3.11.7

pandas 2.1.4

二、求和两列浮点数

用不同的内存对象计算两列之和(new_col=A+B),比较所消耗的时间,列A和列B都是1~100之间的浮点数。

file

向量化速度提升倍数:

file

apply比for循环提升4.8倍左右,向量化比for循环提升3000~9000倍(平均6900倍)。

三、字符串连接

用不同的内存对象将两列字符串连接,比较所消耗的时间,列A和列B都是8字节的字符串。

file

向量化速度提升倍数:

file

apply比for循环提升5.0倍左右,向量化比for循环提升430~650倍(平均580倍)。

Python代码单元测试和性能测试

本文介绍如何使用pytest对python工程进行单元测试和统计测试覆盖率,同时介绍如何测试python程序的运行速度和内存消耗。

一、工程目录结构

测试用例统一放在tests目录下,tests内部目录结构与被测代码的目录结构相同,以便快速定位测试用例代码。

file

如果环境还没有安装过pytest,先执行pip install pytest进行安装。

二、测试用例内容

测试用例的原则是逻辑尽量简单直白,这样当用例运行失败时能够较快的通过调试定位到问题原因,建议在测试代码里添加必要的注释。另外注意下面几个规则:

  • 文件名必须以test_开头,这样pytest命令可以正确识别此文件为测试用例。
  • 文件里每个测试方法必须以test_开头,原因同上。
  • 如果测试用例引用外部文件,可将文件放在同目录下,然后用os.path.dirname(file)引用py文件所在目录,确保从各个目录执行此测试用例都能正确找到外部文件。
  • 如果测试用例生成新的文件,建议用pytest提供的tmp_path参数(文档)作为输出目录,此临时目录下的文件会被自动清理,以免影响测试用例的下次运行。

以下是一个测试用例的例子(test_adapter_csv.py):

from acme.datasets.adapter import *
import os
import pandas as pd

# 验证正常加载csv文件
def test_read_file_to_df():
    df = CSVHandler.read_file_to_df(os.path.dirname(__file__), file_type=CSVInputFile.BUFFER)
    assert len(df) == 16

# 验证数据正常写出到csv文件
def test_write_df_to_csv(tmp_path):
    df = pd.DataFrame()
    CSVHandler.write_df_to_csv(df, path=str(tmp_path), file_type=CSVOutputFile.LOT_FLOW)
    assert os.path.exists(os.path.join(str(tmp_path), 'RESULT.csv'))

为提高代码覆盖率,我们不能只验证输入正常的情况,还应该对处理异常的情况进行验证:

# 验证要加载的csv文件不存在的情况
def test_read_file_to_df_not_exist():
    with pytest.raises(FileNotFoundError):
        df = CSVHandler.read_file_to_df(os.path.dirname(__file__), file_type=CSVInputFile.DEMAND)

三、执行测试用例

方式1、在IDE里可直接执行test_xxx.py文件里的指定方法
file

方式2、在IDE里右键点击tests目录选择执行所有测试用例
file

方式3、在命令行里执行pytest命令可以执行所有测试用例
file

四、统计测试覆盖率

PyCharm专业版里内置了覆盖率工具,在运行时选择"Run with coverage“即可,这里介绍PyCharm社区版如何统计测试覆盖率。

首先确保已安装测试覆盖率组件:

pip install pytest-cov
pip install pytest-html

并且确保在tests目录内的每个目录下已包含init.py文件(否则统计时会忽略此目录下的测试用例)。
file

然后在命令行里执行:

pytest --cov=acme --cov-report=html

即可生成测试覆盖率结果文件,放在htmlcov目录下,打开其中的index.html可查看结果。

即使只有一个测试用例,工程的总体覆盖率也已经达到28%,这是因为有很多import、def代码在加载模块时被覆盖到,而实际的业务逻辑代码并没有被覆盖。
file

file

如果忽略 --cov-report=html 参数则会在控制台里输出每个py文件的覆盖率报告,但不会包含逐行的覆盖率结果。
file

五、性能测试(时间)

方案1:cProfile+Snakeviz

首先安装pytest-profiling组件和snakeviz查看工具:

pip install pytest-profiling
pip install snakeviz

在命令行里执行pytest时添加–profile参数即可统计运行时间:

pytest tests/datasets/test_adapter_csv.py --profile

file

同时会生成二进制格式的统计文件:
file

使用snakeviz工具查看指定prof文件的内容:

snakeviz prof/combined.prof

file

方案2、Viztracer(推荐)

viztracer查看结果的交互方式更加友好,有利于更快速的定位问题,因此推荐使用。

首先安装viztracer组件:

pip install viztracer

用viztracer命令运行指定的测试用例:

viztracer --min_duration 0.1ms tests/datasets/test_adapter_csv.py
vizviewer result.json

运行结束后会生成result.json文件:
file

用vizviewer命令查看result.json的内容(会自动启动浏览器):

vizviewer result.json

file

六、性能测试(内存)

a. 整体内存分析

首先安装必要的组件:

pip install memory_profiler
pip install matplotlib

使用mprof运行指定的测试用例:

mprof run tests/datasets/test_adapter_csv.py

每次运行结束后会生成一个.dat文件:
file

使用mprof plot命令查看最新一个.dat文件的内容,若不指定文件名则自动查看最新一个文件,图形展示了测试用例执行过程中消耗的内存变化情况:

mprof plot mprofile_20240410185255.dat

file

b. 逐行内存分析

有时我们希望分析某个函数或者某几行代码的内存占用,可以用@profile修饰要分析的函数,然后在PyCharm里点击测试函数旁边的运行按钮启动测试用例(命令行里启动不行)。当运行完成后,在控制台里会输出此函数的逐行内存变化。

file

逐行内存占用情况如下图所示,从图中可以看到,读取DataFrame的操作新增占用了0.6MB的内存。

file

我们也可以用@profile标记多个函数,在PyCharm里批量运行测试用例,这样在测试报告里可以分别查看这些函数的逐行内存占用情况:
file

内存向量化计算性能比较

本文比较pandas、polars、duckdb和arrow等几种类库,在内存中进行向量化计算的执行时间差异。

测试环境

硬件:
Apple M2,16GB,SSD

软件:
python 3.11.7
pandas 2.1.4
pyarrow 14.0.2
duckdb 0.10.1
polars 0.20.15

求和两列浮点数

用不同的内存对象计算两列之和(new_col=A+B),比较所消耗的时间,列A和列B都是1~100之间的浮点数。

file

内存压力大时耗时会显著增加,例如下面的pandas和pandas_on_arrow曲线:

file

计算指定列平均值

用不同的内存对象计算指定列的平均值,比较所消耗的时间,指定列是1~100之间的浮点数。

file

字符串连接

用不同的内存对象将两列字符串连接,比较所消耗的时间,列A和列B都是8字节的字符串。

file

开发PDI(Kettle) Step Plugin

Pentaho Data Integration (PDI)是著名的ETL工具Kettle的现用名,这个工具允许用户以图形化的方式构造数据处理流程,除了内置丰富的数据处理节点以外,还允许用户自定义开发自己的数据处理节点以便实现更复杂或更定制的处理逻辑,使用的开发语言是Java。

在某项目里为了实现一个项目特定的数据转换,我开发了一个这样的处理节点,整体感受还是十分流畅的,记录如下(代码略):

从github克隆项目:

git clone git@github.com:pentaho/pdi-sdk-plugins.git

里面包含的kettle-sdk-step-plugin模块即是一个例子,可以基于这个模块的代码按需修改。
开发完成后运行自带的三个测试用例,这样部署后成功率比较高。
打包命令仍然是:

mvn package

打好的是一个jar包,放在${pdi_path}/plugins/steps/<demo_plugin_name>/目录下即可,如果目录不存在可按需创建。